Merge branch 'szhou/feature/multiwaymerge' of github.com:taosdata/TDengine into szhou/feature/multiwaymerge
This commit is contained in:
commit
396a8a6b2e
|
@ -177,8 +177,8 @@ typedef struct {
|
|||
typedef struct SField {
|
||||
char name[TSDB_COL_NAME_LEN];
|
||||
uint8_t type;
|
||||
int32_t bytes;
|
||||
int8_t flags;
|
||||
int32_t bytes;
|
||||
} SField;
|
||||
|
||||
typedef struct SRetention {
|
||||
|
@ -1126,8 +1126,8 @@ typedef struct {
|
|||
STableMetaRsp* pMeta;
|
||||
} SMAlterStbRsp;
|
||||
|
||||
int32_t tEncodeSMAlterStbRsp(SEncoder *pEncoder, const SMAlterStbRsp *pRsp);
|
||||
int32_t tDecodeSMAlterStbRsp(SDecoder *pDecoder, SMAlterStbRsp *pRsp);
|
||||
int32_t tEncodeSMAlterStbRsp(SEncoder* pEncoder, const SMAlterStbRsp* pRsp);
|
||||
int32_t tDecodeSMAlterStbRsp(SDecoder* pDecoder, SMAlterStbRsp* pRsp);
|
||||
void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp);
|
||||
|
||||
int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
|
||||
|
@ -1982,6 +1982,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
SClientHbKey connKey;
|
||||
int64_t clusterId;
|
||||
SQueryHbReqBasic* query;
|
||||
SHashObj* info; // hash<Skv.key, Skv>
|
||||
} SClientHbReq;
|
||||
|
@ -2318,7 +2319,7 @@ typedef struct {
|
|||
int64_t sliding;
|
||||
char* expr; // sma expression
|
||||
char* tagsFilter;
|
||||
SVgEpSet vgEpSet[];
|
||||
SVgEpSet* pVgEpSet;
|
||||
} STSma; // Time-range-wise SMA
|
||||
|
||||
typedef STSma SVCreateTSmaReq;
|
||||
|
|
|
@ -68,6 +68,7 @@ typedef struct SCatalogReq {
|
|||
SArray *pIndex; // element is index name
|
||||
SArray *pUser; // element is SUserAuthInfo
|
||||
bool qNodeRequired; // valid qnode
|
||||
bool forceUpdate;
|
||||
} SCatalogReq;
|
||||
|
||||
typedef struct SMetaData {
|
||||
|
@ -280,7 +281,7 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
|
|||
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet);
|
||||
|
||||
|
||||
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId);
|
||||
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate);
|
||||
|
||||
|
||||
/**
|
||||
|
|
|
@ -222,7 +222,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|
|||
|| (_type) == TDMT_VND_DROP_TABLE || (_type) == TDMT_VND_DROP_STB)
|
||||
|
||||
#define NEED_SCHEDULER_RETRY_ERROR(_code) \
|
||||
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
|
||||
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
|
||||
|
||||
#define REQUEST_MAX_TRY_TIMES 1
|
||||
|
||||
|
|
|
@ -104,6 +104,8 @@ void schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param
|
|||
|
||||
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub);
|
||||
|
||||
void schedulerStopQueryHb(void *pTrans);
|
||||
|
||||
|
||||
/**
|
||||
* Cancel query job
|
||||
|
|
|
@ -235,6 +235,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_TOO_MANY_COLUMNS TAOS_DEF_ERROR_CODE(0, 0x03AC)
|
||||
#define TSDB_CODE_MND_COLUMN_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AD)
|
||||
#define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AE)
|
||||
#define TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03AF)
|
||||
#define TSDB_CODE_MND_SINGLE_STB_MODE_DB TAOS_DEF_ERROR_CODE(0, 0x03B0)
|
||||
|
||||
// mnode-infoSchema
|
||||
|
|
|
@ -57,11 +57,6 @@ enum {
|
|||
|
||||
typedef struct SAppInstInfo SAppInstInfo;
|
||||
|
||||
typedef struct {
|
||||
void* param;
|
||||
SClientHbReq* req;
|
||||
} SHbConnInfo;
|
||||
|
||||
typedef struct {
|
||||
char* key;
|
||||
// statistics
|
||||
|
@ -71,11 +66,8 @@ typedef struct {
|
|||
int64_t startTime;
|
||||
// ctl
|
||||
SRWLatch lock; // lock is used in serialization
|
||||
// connection
|
||||
SAppInstInfo* pAppInstInfo;
|
||||
// info
|
||||
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
|
||||
SHashObj* connInfo; // hash<SClientHbKey, SHbConnInfo>
|
||||
} SAppHbMgr;
|
||||
|
||||
typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp);
|
||||
|
@ -325,8 +317,6 @@ void appHbMgrCleanup(void);
|
|||
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
|
||||
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
|
||||
|
||||
int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
|
||||
|
||||
// --- mq
|
||||
void hbMgrInitMqHbRspHandle();
|
||||
|
||||
|
|
|
@ -130,8 +130,13 @@ void destroyTscObj(void *pObj) {
|
|||
|
||||
SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
|
||||
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
|
||||
atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
|
||||
int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
|
||||
closeAllRequests(pTscObj->pRequests);
|
||||
schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
|
||||
if (0 == connNum) {
|
||||
// TODO
|
||||
//closeTransporter(pTscObj);
|
||||
}
|
||||
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
|
||||
taosThreadMutexDestroy(&pTscObj->mutex);
|
||||
taosMemoryFreeClear(pTscObj);
|
||||
|
@ -223,6 +228,10 @@ static void doDestroyRequest(void *p) {
|
|||
|
||||
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
|
||||
|
||||
if (pRequest->body.queryJob != 0) {
|
||||
schedulerFreeJob(pRequest->body.queryJob);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pRequest->msgBuf);
|
||||
taosMemoryFreeClear(pRequest->sqlstr);
|
||||
taosMemoryFreeClear(pRequest->pDb);
|
||||
|
@ -230,10 +239,6 @@ static void doDestroyRequest(void *p) {
|
|||
doFreeReqResultInfo(&pRequest->body.resInfo);
|
||||
qDestroyQueryPlan(pRequest->body.pDag);
|
||||
|
||||
if (pRequest->body.queryJob != 0) {
|
||||
schedulerFreeJob(pRequest->body.queryJob);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pRequest->tableList);
|
||||
taosArrayDestroy(pRequest->dbList);
|
||||
|
||||
|
|
|
@ -129,9 +129,9 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
|
|||
}
|
||||
|
||||
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
||||
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
|
||||
if (NULL == info) {
|
||||
tscWarn("fail to get connInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
|
||||
SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &pRsp->connKey, sizeof(SClientHbKey));
|
||||
if (NULL == pReq) {
|
||||
tscWarn("pReq to get activeInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
|
||||
pRsp->connKey.connType);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -181,12 +181,11 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
|||
break;
|
||||
}
|
||||
|
||||
int64_t *clusterId = (int64_t *)info->param;
|
||||
struct SCatalog *pCatalog = NULL;
|
||||
|
||||
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
|
||||
int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
|
||||
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -199,12 +198,11 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
|||
break;
|
||||
}
|
||||
|
||||
int64_t *clusterId = (int64_t *)info->param;
|
||||
struct SCatalog *pCatalog = NULL;
|
||||
|
||||
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
|
||||
int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
|
||||
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -217,12 +215,11 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
|||
break;
|
||||
}
|
||||
|
||||
int64_t *clusterId = (int64_t *)info->param;
|
||||
struct SCatalog *pCatalog = NULL;
|
||||
|
||||
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
|
||||
int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
|
||||
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -547,14 +544,11 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
|
|||
|
||||
pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq);
|
||||
|
||||
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey));
|
||||
if (info) {
|
||||
code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, info->param, pOneReq);
|
||||
code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, &pOneReq->clusterId, pOneReq);
|
||||
if (code) {
|
||||
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
//hbClearClientHbReq(pOneReq);
|
||||
|
||||
|
@ -569,23 +563,6 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
|
|||
return pBatchReq;
|
||||
}
|
||||
|
||||
void hbClearReqInfo(SAppHbMgr *pAppHbMgr) {
|
||||
void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
|
||||
while (pIter != NULL) {
|
||||
SClientHbReq *pOneReq = pIter;
|
||||
|
||||
tFreeReqKvHash(pOneReq->info);
|
||||
taosHashClear(pOneReq->info);
|
||||
|
||||
if (pOneReq->query) {
|
||||
taosArrayDestroy(pOneReq->query->queryDesc);
|
||||
taosMemoryFreeClear(pOneReq->query);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
|
||||
}
|
||||
}
|
||||
|
||||
void hbThreadFuncUnexpectedStopped(void) {
|
||||
atomic_store_8(&clientHbMgr.threadStop, 2);
|
||||
}
|
||||
|
@ -715,14 +692,6 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
|
|||
}
|
||||
|
||||
taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
|
||||
// init getInfoFunc
|
||||
pAppHbMgr->connInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
|
||||
|
||||
if (pAppHbMgr->connInfo == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pAppHbMgr);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&clientHbMgr.lock);
|
||||
taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
|
||||
|
@ -745,15 +714,6 @@ void appHbMgrCleanup(void) {
|
|||
taosHashCleanup(pTarget->activeInfo);
|
||||
pTarget->activeInfo = NULL;
|
||||
|
||||
pIter = taosHashIterate(pTarget->connInfo, NULL);
|
||||
while (pIter != NULL) {
|
||||
SHbConnInfo *info = pIter;
|
||||
taosMemoryFree(info->param);
|
||||
pIter = taosHashIterate(pTarget->connInfo, pIter);
|
||||
}
|
||||
taosHashCleanup(pTarget->connInfo);
|
||||
pTarget->connInfo = NULL;
|
||||
|
||||
taosMemoryFree(pTarget->key);
|
||||
taosMemoryFree(pTarget);
|
||||
}
|
||||
|
@ -791,7 +751,7 @@ void hbMgrCleanUp() {
|
|||
clientHbMgr.appHbMgrs = NULL;
|
||||
}
|
||||
|
||||
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) {
|
||||
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clusterId) {
|
||||
// init hash in activeinfo
|
||||
void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
|
||||
if (data != NULL) {
|
||||
|
@ -799,17 +759,11 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
|
|||
}
|
||||
SClientHbReq hbReq = {0};
|
||||
hbReq.connKey = connKey;
|
||||
hbReq.clusterId = clusterId;
|
||||
//hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
|
||||
|
||||
taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
|
||||
|
||||
// init hash
|
||||
if (info != NULL) {
|
||||
SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
|
||||
info->req = pReq;
|
||||
taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo));
|
||||
}
|
||||
|
||||
atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
|
||||
return 0;
|
||||
}
|
||||
|
@ -819,15 +773,10 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in
|
|||
.tscRid = tscRefId,
|
||||
.connType = connType,
|
||||
};
|
||||
SHbConnInfo info = {0};
|
||||
|
||||
switch (connType) {
|
||||
case CONN_TYPE__QUERY: {
|
||||
int64_t *pClusterId = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pClusterId = clusterId;
|
||||
|
||||
info.param = pClusterId;
|
||||
return hbRegisterConnImpl(pAppHbMgr, connKey, &info);
|
||||
return hbRegisterConnImpl(pAppHbMgr, connKey, clusterId);
|
||||
}
|
||||
case CONN_TYPE__TMQ: {
|
||||
return 0;
|
||||
|
@ -844,26 +793,10 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
|
|||
taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
|
||||
}
|
||||
|
||||
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
|
||||
if (info) {
|
||||
taosMemoryFree(info->param);
|
||||
taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
|
||||
}
|
||||
|
||||
if (NULL == pReq || NULL == info) {
|
||||
if (NULL == pReq) {
|
||||
return;
|
||||
}
|
||||
|
||||
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
|
||||
}
|
||||
|
||||
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *key, void *value, int32_t keyLen,
|
||||
int32_t valueLen) {
|
||||
// find req by connection id
|
||||
SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
|
||||
ASSERT(pReq != NULL);
|
||||
|
||||
taosHashPut(pReq->info, key, keyLen, value, valueLen);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -180,7 +180,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
taosMemoryFreeClear(output.dbVgroup);
|
||||
|
||||
tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr());
|
||||
} else if (output.dbVgroup) {
|
||||
} else if (output.dbVgroup && output.dbVgroup->vgHash) {
|
||||
struct SCatalog* pCatalog = NULL;
|
||||
|
||||
int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||
|
|
|
@ -2332,6 +2332,8 @@ static int32_t isSchemalessDb(SSmlHandle* info){
|
|||
smlBuildInvalidDataMsg(&info->msgBuf, "catalogGetDBCfg error, code:", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
taosArrayDestroy(pInfo.pRetensions);
|
||||
|
||||
if (!pInfo.schemaless){
|
||||
info->pRequest->code = TSDB_CODE_SML_INVALID_DB_CONF;
|
||||
smlBuildInvalidDataMsg(&info->msgBuf, "can not insert into schemaless db:", dbFname);
|
||||
|
|
|
@ -694,7 +694,6 @@ void tFreeSMAltertbReq(SMAlterStbReq *pReq) {
|
|||
pReq->pFields = NULL;
|
||||
}
|
||||
|
||||
|
||||
int32_t tSerializeSEpSet(void *buf, int32_t bufLen, const SEpSet *pEpset) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
@ -3674,12 +3673,12 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
|
|||
if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1;
|
||||
}
|
||||
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
|
||||
if (tEncodeI32(pCoder, pSma->vgEpSet[v].vgId) < 0) return -1;
|
||||
if (tEncodeI8(pCoder, pSma->vgEpSet[v].epSet.inUse) < 0) return -1;
|
||||
int8_t numOfEps = pSma->vgEpSet[v].epSet.numOfEps;
|
||||
if (tEncodeI32(pCoder, pSma->pVgEpSet[v].vgId) < 0) return -1;
|
||||
if (tEncodeI8(pCoder, pSma->pVgEpSet[v].epSet.inUse) < 0) return -1;
|
||||
int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps;
|
||||
if (tEncodeI8(pCoder, numOfEps) < 0) return -1;
|
||||
for (int32_t n = 0; n < numOfEps; ++n) {
|
||||
const SEp *pEp = &pSma->vgEpSet[v].epSet.eps[n];
|
||||
const SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n];
|
||||
if (tEncodeCStr(pCoder, pEp->fqdn) < 0) return -1;
|
||||
if (tEncodeU16(pCoder, pEp->port) < 0) return -1;
|
||||
}
|
||||
|
@ -3712,17 +3711,27 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
|
|||
} else {
|
||||
pSma->tagsFilter = NULL;
|
||||
}
|
||||
if (pSma->numOfVgroups > 0) {
|
||||
pSma->pVgEpSet = (SVgEpSet *)tDecoderMalloc(pCoder, pSma->numOfVgroups * sizeof(SVgEpSet));
|
||||
if (!pSma->pVgEpSet) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
memset(pSma->pVgEpSet, 0, pSma->numOfVgroups * sizeof(SVgEpSet));
|
||||
|
||||
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
|
||||
if (tDecodeI32(pCoder, &pSma->vgEpSet[v].vgId) < 0) return -1;
|
||||
if (tDecodeI8(pCoder, &pSma->vgEpSet[v].epSet.inUse) < 0) return -1;
|
||||
if (tDecodeI8(pCoder, &pSma->vgEpSet[v].epSet.numOfEps) < 0) return -1;
|
||||
int8_t numOfEps = pSma->vgEpSet[v].epSet.numOfEps;
|
||||
if (tDecodeI32(pCoder, &pSma->pVgEpSet[v].vgId) < 0) return -1;
|
||||
if (tDecodeI8(pCoder, &pSma->pVgEpSet[v].epSet.inUse) < 0) return -1;
|
||||
if (tDecodeI8(pCoder, &pSma->pVgEpSet[v].epSet.numOfEps) < 0) return -1;
|
||||
int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps;
|
||||
for (int32_t n = 0; n < numOfEps; ++n) {
|
||||
SEp *pEp = &pSma->vgEpSet[v].epSet.eps[n];
|
||||
SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n];
|
||||
if (tDecodeCStrTo(pCoder, pEp->fqdn) < 0) return -1;
|
||||
if (tDecodeU16(pCoder, &pEp->port) < 0) return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -3765,7 +3774,7 @@ int32_t tDecodeSVDropTSmaReq(SDecoder *pCoder, SVDropTSmaReq *pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder* pCoder, const SVGetTsmaExpWndsReq* pReq) {
|
||||
int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder *pCoder, const SVGetTsmaExpWndsReq *pReq) {
|
||||
if (tStartEncode(pCoder) < 0) return -1;
|
||||
|
||||
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
|
||||
|
@ -3776,7 +3785,7 @@ int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder* pCoder, const SVGetTsmaExpWndsReq*
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder* pCoder, SVGetTsmaExpWndsReq* pReq) {
|
||||
int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder *pCoder, SVGetTsmaExpWndsReq *pReq) {
|
||||
if (tStartDecode(pCoder) < 0) return -1;
|
||||
|
||||
if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1;
|
||||
|
@ -3787,7 +3796,7 @@ int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder* pCoder, SVGetTsmaExpWndsReq* pReq)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSVGetTSmaExpWndsRsp(SEncoder* pCoder, const SVGetTsmaExpWndsRsp* pReq) {
|
||||
int32_t tEncodeSVGetTSmaExpWndsRsp(SEncoder *pCoder, const SVGetTsmaExpWndsRsp *pReq) {
|
||||
if (tStartEncode(pCoder) < 0) return -1;
|
||||
|
||||
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
|
||||
|
@ -3814,7 +3823,7 @@ int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder *pCoder, SVGetTsmaExpWndsRsp *pReq)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSVDeleteReq(SEncoder* pCoder, const SVDeleteReq* pReq) {
|
||||
int32_t tEncodeSVDeleteReq(SEncoder *pCoder, const SVDeleteReq *pReq) {
|
||||
if (tStartEncode(pCoder) < 0) return -1;
|
||||
|
||||
if (tEncodeI64(pCoder, pReq->delUid) < 0) return -1;
|
||||
|
@ -3832,7 +3841,7 @@ int32_t tEncodeSVDeleteReq(SEncoder* pCoder, const SVDeleteReq* pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDecodeSVDeleteReq(SDecoder* pCoder, SVDeleteReq* pReq) {
|
||||
int32_t tDecodeSVDeleteReq(SDecoder *pCoder, SVDeleteReq *pReq) {
|
||||
if (tStartDecode(pCoder) < 0) return -1;
|
||||
|
||||
if (tDecodeI64(pCoder, &pReq->delUid) < 0) return -1;
|
||||
|
@ -3850,7 +3859,7 @@ int32_t tDecodeSVDeleteReq(SDecoder* pCoder, SVDeleteReq* pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq) {
|
||||
int32_t tEncodeSVDeleteRsp(SEncoder *pCoder, const SVDeleteRsp *pReq) {
|
||||
if (tStartEncode(pCoder) < 0) return -1;
|
||||
|
||||
if (tEncodeI32(pCoder, pReq->code) < 0) return -1;
|
||||
|
@ -3860,7 +3869,7 @@ int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDecodeSVDeleteRsp(SDecoder* pCoder, SVDeleteRsp* pReq) {
|
||||
int32_t tDecodeSVDeleteRsp(SDecoder *pCoder, SVDeleteRsp *pReq) {
|
||||
if (tStartDecode(pCoder) < 0) return -1;
|
||||
|
||||
if (tDecodeI32(pCoder, &pReq->code) < 0) return -1;
|
||||
|
@ -4559,7 +4568,7 @@ int32_t tDeserializeSMAlterStbRsp(void *buf, int32_t bufLen, SMAlterStbRsp *pRsp
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp) {
|
||||
void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) {
|
||||
if (NULL == pRsp) {
|
||||
return;
|
||||
}
|
||||
|
@ -4569,6 +4578,3 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp) {
|
|||
taosMemoryFree(pRsp->pMeta);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -316,10 +316,12 @@ typedef struct {
|
|||
int32_t tagsFilterLen;
|
||||
int32_t sqlLen;
|
||||
int32_t astLen;
|
||||
int32_t numOfVgroups;
|
||||
char* expr;
|
||||
char* tagsFilter;
|
||||
char* sql;
|
||||
char* ast;
|
||||
SVgEpSet* pVgEpSet;
|
||||
} SSmaObj;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -37,7 +37,7 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
|
|||
|
||||
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
|
||||
|
||||
bool mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, const SArray *colIds);
|
||||
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw);
|
|||
static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma);
|
||||
static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
|
||||
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
|
||||
static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups);
|
||||
static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
|
||||
|
@ -262,6 +263,8 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm
|
|||
req.sliding = pSma->sliding;
|
||||
req.expr = pSma->expr;
|
||||
req.tagsFilter = pSma->tagsFilter;
|
||||
req.numOfVgroups = pSma->numOfVgroups;
|
||||
req.pVgEpSet = pSma->pVgEpSet;
|
||||
|
||||
// get length
|
||||
int32_t ret = 0;
|
||||
|
@ -420,6 +423,15 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
|
|||
mndReleaseDnode(pMnode, pDnode);
|
||||
|
||||
// todo add sma info here
|
||||
SVgEpSet *pVgEpSet = NULL;
|
||||
int32_t numOfVgroups = 0;
|
||||
if (mndSmaGetVgEpSet(pMnode, pDb, &pVgEpSet, &numOfVgroups) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pSma->pVgEpSet = pVgEpSet;
|
||||
pSma->numOfVgroups = numOfVgroups;
|
||||
|
||||
int32_t smaContLen = 0;
|
||||
void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
|
||||
if (pSmaReq == NULL) return -1;
|
||||
|
@ -964,3 +976,52 @@ static void mndCancelGetNextSma(SMnode *pMnode, void *pIter) {
|
|||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
||||
static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
void *pIter = NULL;
|
||||
SVgEpSet *pVgEpSet = NULL;
|
||||
int32_t nAllocVgs = 16;
|
||||
int32_t nVgs = 0;
|
||||
|
||||
pVgEpSet = taosMemoryCalloc(nAllocVgs, sizeof(SVgEpSet));
|
||||
if (!pVgEpSet) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pDb->uid) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (nVgs >= nAllocVgs) {
|
||||
void *p = taosMemoryRealloc(pVgEpSet, nAllocVgs * 2 * sizeof(SVgEpSet));
|
||||
if (!p) {
|
||||
taosMemoryFree(pVgEpSet);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pVgEpSet = (SVgEpSet *)p;
|
||||
nAllocVgs *= 2;
|
||||
}
|
||||
|
||||
(pVgEpSet + nVgs)->vgId = pVgroup->vgId;
|
||||
(pVgEpSet + nVgs)->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
|
||||
++nVgs;
|
||||
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
*ppVgEpSet = pVgEpSet;
|
||||
*numOfVgroups = nVgs;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "mndPerfSchema.h"
|
||||
#include "mndScheduler.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndTopic.h"
|
||||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
#include "mndVgroup.h"
|
||||
|
@ -394,14 +395,14 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
|||
req.pRSmaParam.xFilesFactor = pStb->xFilesFactor;
|
||||
req.pRSmaParam.delay = pStb->delay;
|
||||
if (pStb->ast1Len > 0) {
|
||||
if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, req.pRSmaParam.xFilesFactor) !=
|
||||
TSDB_CODE_SUCCESS) {
|
||||
if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len,
|
||||
req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
if (pStb->ast2Len > 0) {
|
||||
if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, req.pRSmaParam.xFilesFactor) !=
|
||||
TSDB_CODE_SUCCESS) {
|
||||
if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len,
|
||||
req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
@ -949,13 +950,18 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
|
||||
static int32_t mndDropSuperTableTag(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
|
||||
int32_t tag = mndFindSuperTableTagIndex(pOld, tagName);
|
||||
if (tag < 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
col_id_t colId = pOld->pTags[tag].colId;
|
||||
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -968,7 +974,7 @@ static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const ch
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pFields) {
|
||||
static int32_t mndAlterStbTagName(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, SArray *pFields) {
|
||||
if ((int32_t)taosArrayGetSize(pFields) != 2) {
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
return -1;
|
||||
|
@ -986,6 +992,11 @@ static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pF
|
|||
return -1;
|
||||
}
|
||||
|
||||
col_id_t colId = pOld->pTags[tag].colId;
|
||||
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndFindSuperTableTagIndex(pOld, newTagName) >= 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_ALREADY_EXIST;
|
||||
return -1;
|
||||
|
@ -1008,13 +1019,18 @@ static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pF
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
|
||||
static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
|
||||
int32_t tag = mndFindSuperTableTagIndex(pOld, pField->name);
|
||||
if (tag < 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
col_id_t colId = pOld->pTags[tag].colId;
|
||||
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -1075,7 +1091,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const char *colName) {
|
||||
static int32_t mndDropSuperTableColumn(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *colName) {
|
||||
int32_t col = mndFindSuperTableColumnIndex(pOld, colName);
|
||||
if (col < 0) {
|
||||
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
|
||||
|
@ -1092,6 +1108,11 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const
|
|||
return -1;
|
||||
}
|
||||
|
||||
col_id_t colId = pOld->pTags[col].colId;
|
||||
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -1104,7 +1125,7 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
|
||||
static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
|
||||
int32_t col = mndFindSuperTableColumnIndex(pOld, pField->name);
|
||||
if (col < 0) {
|
||||
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
|
||||
|
@ -1121,6 +1142,11 @@ static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const
|
|||
return -1;
|
||||
}
|
||||
|
||||
col_id_t colId = pOld->pTags[col].colId;
|
||||
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -1199,7 +1225,6 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableMetaRsp *pRsp) {
|
||||
taosRLockLatch(&pStb->lock);
|
||||
|
||||
|
@ -1269,8 +1294,8 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont, int32_t *pLen) {
|
||||
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont,
|
||||
int32_t *pLen) {
|
||||
int ret;
|
||||
SEncoder ec = {0};
|
||||
uint32_t contLen = 0;
|
||||
|
@ -1296,7 +1321,7 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, S
|
|||
return ret;
|
||||
}
|
||||
|
||||
void* cont = taosMemoryMalloc(contLen);
|
||||
void *cont = taosMemoryMalloc(contLen);
|
||||
tEncoderInit(&ec, cont, contLen);
|
||||
tEncodeSMAlterStbRsp(&ec, &alterRsp);
|
||||
tEncoderClear(&ec);
|
||||
|
@ -1311,18 +1336,18 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, S
|
|||
|
||||
static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
|
||||
bool needRsp = true;
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = NULL;
|
||||
SField *pField0 = NULL;
|
||||
|
||||
SStbObj stbObj = {0};
|
||||
taosRLockLatch(&pOld->lock);
|
||||
memcpy(&stbObj, pOld, sizeof(SStbObj));
|
||||
taosRUnLockLatch(&pOld->lock);
|
||||
stbObj.pColumns = NULL;
|
||||
stbObj.pTags = NULL;
|
||||
stbObj.updateTime = taosGetTimestampMs();
|
||||
stbObj.lock = 0;
|
||||
taosRUnLockLatch(&pOld->lock);
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = NULL;
|
||||
SField *pField0 = NULL;
|
||||
|
||||
switch (pAlter->alterType) {
|
||||
case TSDB_ALTER_TABLE_ADD_TAG:
|
||||
|
@ -1330,25 +1355,25 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
|
|||
break;
|
||||
case TSDB_ALTER_TABLE_DROP_TAG:
|
||||
pField0 = taosArrayGet(pAlter->pFields, 0);
|
||||
code = mndDropSuperTableTag(pOld, &stbObj, pField0->name);
|
||||
code = mndDropSuperTableTag(pMnode, pOld, &stbObj, pField0->name);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
|
||||
code = mndAlterStbTagName(pOld, &stbObj, pAlter->pFields);
|
||||
code = mndAlterStbTagName(pMnode, pOld, &stbObj, pAlter->pFields);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
|
||||
pField0 = taosArrayGet(pAlter->pFields, 0);
|
||||
code = mndAlterStbTagBytes(pOld, &stbObj, pField0);
|
||||
code = mndAlterStbTagBytes(pMnode, pOld, &stbObj, pField0);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
||||
code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
||||
pField0 = taosArrayGet(pAlter->pFields, 0);
|
||||
code = mndDropSuperTableColumn(pOld, &stbObj, pField0->name);
|
||||
code = mndDropSuperTableColumn(pMnode, pOld, &stbObj, pField0->name);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
|
||||
pField0 = taosArrayGet(pAlter->pFields, 0);
|
||||
code = mndAlterStbColumnBytes(pOld, &stbObj, pField0);
|
||||
code = mndAlterStbColumnBytes(pMnode, pOld, &stbObj, pField0);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
|
||||
needRsp = false;
|
||||
|
@ -1370,9 +1395,9 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
|
|||
mndTransSetDbName(pTrans, pDb->name);
|
||||
|
||||
if (needRsp) {
|
||||
void* pCont = NULL;
|
||||
void *pCont = NULL;
|
||||
int32_t contLen = 0;
|
||||
if (mndBuildSMAlterStbRsp(pDb, pAlter, &stbObj, &pCont, &contLen)) goto _OVER;
|
||||
if (mndBuildSMAlterStbRsp(pDb, pAlter, &stbObj, &pCont, &contLen) != 0) goto _OVER;
|
||||
mndTransSetRpcRsp(pTrans, pCont, contLen);
|
||||
}
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
|
|||
return strchr(topic, '.') + 1;
|
||||
}
|
||||
|
||||
bool mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, const SArray *colAndTagIds) {
|
||||
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
bool found = false;
|
||||
|
@ -105,20 +105,21 @@ bool mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, const SArray *col
|
|||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(colAndTagIds); i++) {
|
||||
int16_t *pColId = taosArrayGet(colAndTagIds, i);
|
||||
if (taosHashGet(pColHash, pColId, sizeof(int16_t)) != NULL) {
|
||||
if (taosHashGet(pColHash, &colId, sizeof(int16_t)) != NULL) {
|
||||
found = true;
|
||||
goto NEXT;
|
||||
}
|
||||
}
|
||||
|
||||
NEXT:
|
||||
sdbRelease(pSdb, pTopic);
|
||||
nodesDestroyNode(pAst);
|
||||
if (found) return false;
|
||||
if (found) {
|
||||
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
|
||||
return -1;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||
|
|
|
@ -103,7 +103,7 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
|
|||
tIndexJsonPut(pMeta->pTagIvtIdx, terms, tuid);
|
||||
indexMultiTermDestroy(terms);
|
||||
#endif
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||
|
|
|
@ -52,6 +52,7 @@ enum {
|
|||
CTG_OP_UPDATE_VGROUP = 0,
|
||||
CTG_OP_UPDATE_TB_META,
|
||||
CTG_OP_DROP_DB_CACHE,
|
||||
CTG_OP_DROP_DB_VGROUP,
|
||||
CTG_OP_DROP_STB_META,
|
||||
CTG_OP_DROP_TB_META,
|
||||
CTG_OP_UPDATE_USER,
|
||||
|
@ -266,26 +267,32 @@ typedef struct SCtgUpdateTblMsg {
|
|||
STableMetaOutput* output;
|
||||
} SCtgUpdateTblMsg;
|
||||
|
||||
typedef struct SCtgRemoveDBMsg {
|
||||
typedef struct SCtgDropDBMsg {
|
||||
SCatalog* pCtg;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
uint64_t dbId;
|
||||
} SCtgRemoveDBMsg;
|
||||
} SCtgDropDBMsg;
|
||||
|
||||
typedef struct SCtgRemoveStbMsg {
|
||||
typedef struct SCtgDropDbVgroupMsg {
|
||||
SCatalog* pCtg;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
} SCtgDropDbVgroupMsg;
|
||||
|
||||
|
||||
typedef struct SCtgDropStbMetaMsg {
|
||||
SCatalog* pCtg;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
char stbName[TSDB_TABLE_NAME_LEN];
|
||||
uint64_t dbId;
|
||||
uint64_t suid;
|
||||
} SCtgRemoveStbMsg;
|
||||
} SCtgDropStbMetaMsg;
|
||||
|
||||
typedef struct SCtgRemoveTblMsg {
|
||||
typedef struct SCtgDropTblMetaMsg {
|
||||
SCatalog* pCtg;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
uint64_t dbId;
|
||||
} SCtgRemoveTblMsg;
|
||||
} SCtgDropTblMetaMsg;
|
||||
|
||||
typedef struct SCtgUpdateUserMsg {
|
||||
SCatalog* pCtg;
|
||||
|
@ -451,6 +458,7 @@ int32_t ctgGetTbMetaFromCache(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTabl
|
|||
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action);
|
||||
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action);
|
||||
int32_t ctgOpDropDbCache(SCtgCacheOperation *action);
|
||||
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *action);
|
||||
int32_t ctgOpDropStbMeta(SCtgCacheOperation *action);
|
||||
int32_t ctgOpDropTbMeta(SCtgCacheOperation *action);
|
||||
int32_t ctgOpUpdateUser(SCtgCacheOperation *action);
|
||||
|
@ -464,6 +472,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
|
|||
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName);
|
||||
int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass);
|
||||
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId);
|
||||
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncReq);
|
||||
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq);
|
||||
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq);
|
||||
int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq);
|
||||
|
|
|
@ -286,6 +286,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
|
|||
int32_t taskIdx = 0;
|
||||
for (int32_t i = 0; i < dbVgNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
|
||||
if (pReq->forceUpdate) {
|
||||
ctgDropDbVgroupEnqueue(pCtg, dbFName, true);
|
||||
}
|
||||
CTG_ERR_JRET(ctgInitGetDbVgTask(pJob, taskIdx++, dbFName));
|
||||
}
|
||||
|
||||
|
@ -301,6 +304,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
|
|||
|
||||
for (int32_t i = 0; i < tbMetaNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableMeta, i);
|
||||
if (pReq->forceUpdate) {
|
||||
catalogRemoveTableMeta(pCtg, name);
|
||||
}
|
||||
CTG_ERR_JRET(ctgInitGetTbMetaTask(pJob, taskIdx++, name));
|
||||
}
|
||||
|
||||
|
|
|
@ -35,6 +35,11 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {
|
|||
"drop DB",
|
||||
ctgOpDropDbCache
|
||||
},
|
||||
{
|
||||
CTG_OP_DROP_DB_VGROUP,
|
||||
"drop DBVgroup",
|
||||
ctgOpDropDbVgroup
|
||||
},
|
||||
{
|
||||
CTG_OP_DROP_STB_META,
|
||||
"drop stbMeta",
|
||||
|
@ -563,9 +568,9 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
|
|||
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) {
|
||||
int32_t code = 0;
|
||||
SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_CACHE};
|
||||
SCtgRemoveDBMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveDBMsg));
|
||||
SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg));
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
|
@ -590,13 +595,43 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) {
|
||||
int32_t code = 0;
|
||||
SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_VGROUP, .syncOp = syncOp};
|
||||
SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
char *p = strchr(dbFName, '.');
|
||||
if (p && CTG_IS_SYS_DBNAME(p + 1)) {
|
||||
dbFName = p + 1;
|
||||
}
|
||||
|
||||
msg->pCtg = pCtg;
|
||||
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
||||
|
||||
action.data = msg;
|
||||
|
||||
CTG_ERR_JRET(ctgEnqueue(pCtg, &action));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(action.data);
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
|
||||
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncOp) {
|
||||
int32_t code = 0;
|
||||
SCtgCacheOperation action= {.opId = CTG_OP_DROP_STB_META, .syncOp = syncOp};
|
||||
SCtgRemoveStbMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveStbMsg));
|
||||
SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg));
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
|
@ -623,9 +658,9 @@ _return:
|
|||
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
|
||||
int32_t code = 0;
|
||||
SCtgCacheOperation action= {.opId = CTG_OP_DROP_TB_META, .syncOp = syncOp};
|
||||
SCtgRemoveTblMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveTblMsg));
|
||||
SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveTblMsg));
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
|
@ -1281,7 +1316,7 @@ _return:
|
|||
|
||||
int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
|
||||
int32_t code = 0;
|
||||
SCtgRemoveDBMsg *msg = operation->data;
|
||||
SCtgDropDBMsg *msg = operation->data;
|
||||
SCatalog* pCtg = msg->pCtg;
|
||||
|
||||
SCtgDBCache *dbCache = NULL;
|
||||
|
@ -1304,6 +1339,33 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
|
||||
int32_t code = 0;
|
||||
SCtgDropDbVgroupMsg *msg = operation->data;
|
||||
SCatalog* pCtg = msg->pCtg;
|
||||
|
||||
SCtgDBCache *dbCache = NULL;
|
||||
ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
|
||||
if (NULL == dbCache) {
|
||||
goto _return;
|
||||
}
|
||||
|
||||
CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache));
|
||||
|
||||
ctgFreeVgInfo(dbCache->vgInfo);
|
||||
dbCache->vgInfo = NULL;
|
||||
|
||||
ctgDebug("db vgInfo removed, dbFName:%s", msg->dbFName);
|
||||
|
||||
ctgWReleaseVgInfo(dbCache);
|
||||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
|
||||
int32_t code = 0;
|
||||
|
@ -1353,7 +1415,7 @@ _return:
|
|||
|
||||
int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
|
||||
int32_t code = 0;
|
||||
SCtgRemoveStbMsg *msg = operation->data;
|
||||
SCtgDropStbMetaMsg *msg = operation->data;
|
||||
SCatalog* pCtg = msg->pCtg;
|
||||
|
||||
SCtgDBCache *dbCache = NULL;
|
||||
|
@ -1399,7 +1461,7 @@ _return:
|
|||
|
||||
int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
|
||||
int32_t code = 0;
|
||||
SCtgRemoveTblMsg *msg = operation->data;
|
||||
SCtgDropTblMetaMsg *msg = operation->data;
|
||||
SCatalog* pCtg = msg->pCtg;
|
||||
|
||||
SCtgDBCache *dbCache = NULL;
|
||||
|
|
|
@ -132,7 +132,22 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId) {
|
||||
|
||||
/*
|
||||
prepare SQL:
|
||||
create database db1;
|
||||
use db1;
|
||||
create stable st1 (ts timestamp, f1 int) tags(t1 int);
|
||||
create table tb1 using st1 tags(1);
|
||||
insert into tb1 values (now, 1);
|
||||
create qnode on dnode 1;
|
||||
create user user1 pass "abc";
|
||||
create database db2;
|
||||
grant write on db2.* to user1;
|
||||
create function udf1 as '/tmp/libudf1.so' outputtype int;
|
||||
create aggregate function udf2 as '/tmp/libudf2.so' outputtype int;
|
||||
*/
|
||||
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate) {
|
||||
int32_t code = 0;
|
||||
SCatalogReq req = {0};
|
||||
req.pTableMeta = taosArrayInit(2, sizeof(SName));
|
||||
|
@ -144,6 +159,7 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
|
|||
req.pIndex = NULL;//taosArrayInit(2, TSDB_INDEX_FNAME_LEN);
|
||||
req.pUser = taosArrayInit(2, sizeof(SUserAuthInfo));
|
||||
req.qNodeRequired = true;
|
||||
req.forceUpdate = forceUpdate;
|
||||
|
||||
SName name = {0};
|
||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||
|
|
|
@ -38,7 +38,7 @@ typedef struct SIFParam {
|
|||
col_id_t colId;
|
||||
int64_t suid; // add later
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
char colName[TSDB_COL_NAME_LEN];
|
||||
char colName[TSDB_COL_NAME_LEN * 2 + 4];
|
||||
|
||||
SIndexMetaArg arg;
|
||||
} SIFParam;
|
||||
|
|
|
@ -34,6 +34,7 @@ extern "C" {
|
|||
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
|
||||
#define QW_DEFAULT_SHORT_RUN_TIMES 2
|
||||
#define QW_DEFAULT_HEARTBEAT_MSEC 5000
|
||||
#define QW_SCH_TIMEOUT_MSEC 180000
|
||||
|
||||
enum {
|
||||
QW_PHASE_PRE_QUERY = 1,
|
||||
|
@ -137,7 +138,7 @@ typedef struct SQWTaskCtx {
|
|||
} SQWTaskCtx;
|
||||
|
||||
typedef struct SQWSchStatus {
|
||||
int32_t lastAccessTs; // timestamp in second
|
||||
int64_t hbBrokenTs; // timestamp in msecond
|
||||
SRWLatch hbConnLock;
|
||||
SRpcHandleInfo hbConnInfo;
|
||||
SQueryNodeEpId hbEpId;
|
||||
|
@ -354,6 +355,8 @@ int32_t qwOpenRef(void);
|
|||
void qwSetHbParam(int64_t refId, SQWHbParam **pParam);
|
||||
int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
|
||||
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type);
|
||||
void qwClearExpiredSch(SArray* pExpiredSch);
|
||||
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
|
||||
|
||||
void qwDbgDumpMgmtInfo(SQWorker *mgmt);
|
||||
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);
|
||||
|
|
|
@ -535,3 +535,9 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
|
||||
void qwClearExpiredSch(SArray* pExpiredSch) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -21,10 +21,12 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
|
|||
SSchedulerHbRsp rsp = {0};
|
||||
SQWSchStatus *sch = NULL;
|
||||
|
||||
QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
|
||||
QW_ERR_RET(qwAcquireScheduler(mgmt, req->sId, QW_READ, &sch));
|
||||
|
||||
QW_LOCK(QW_WRITE, &sch->hbConnLock);
|
||||
|
||||
sch->hbBrokenTs = taosGetTimestampMs();
|
||||
|
||||
if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
|
||||
tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
|
||||
sch->hbConnInfo.handle = NULL;
|
||||
|
@ -794,6 +796,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
|||
SQWSchStatus *sch = NULL;
|
||||
int32_t taskNum = 0;
|
||||
SQWHbInfo *rspList = NULL;
|
||||
SArray *pExpiredSch = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
qwDbgDumpMgmtInfo(mgmt);
|
||||
|
@ -809,8 +812,11 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
|||
}
|
||||
|
||||
rspList = taosMemoryCalloc(schNum, sizeof(SQWHbInfo));
|
||||
if (NULL == rspList) {
|
||||
pExpiredSch = taosArrayInit(schNum, sizeof(uint64_t));
|
||||
if (NULL == rspList || NULL == pExpiredSch) {
|
||||
QW_UNLOCK(QW_READ, &mgmt->schLock);
|
||||
taosMemoryFree(rspList);
|
||||
taosArrayDestroy(pExpiredSch);
|
||||
QW_ELOG("calloc %d SQWHbInfo failed", schNum);
|
||||
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
|
||||
qwRelease(refId);
|
||||
|
@ -820,6 +826,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
|||
void *key = NULL;
|
||||
size_t keyLen = 0;
|
||||
int32_t i = 0;
|
||||
int64_t currentMs = taosGetTimestampMs();
|
||||
|
||||
void *pIter = taosHashIterate(mgmt->schHash, NULL);
|
||||
while (pIter) {
|
||||
|
@ -827,6 +834,11 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
|||
if (NULL == sch->hbConnInfo.handle) {
|
||||
uint64_t *sId = taosHashGetKey(pIter, NULL);
|
||||
QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
|
||||
|
||||
if (sch->hbBrokenTs > 0 && ((currentMs - sch->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && taosHashGetSize(sch->tasksHash) <= 0) {
|
||||
taosArrayPush(pExpiredSch, sId);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(mgmt->schHash, pIter);
|
||||
continue;
|
||||
}
|
||||
|
@ -852,7 +864,12 @@ _return:
|
|||
tFreeSSchedulerHbRsp(&rspList[j].rsp);
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pExpiredSch) > 0) {
|
||||
qwClearExpiredSch(pExpiredSch);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(rspList);
|
||||
taosArrayDestroy(pExpiredSch);
|
||||
|
||||
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
|
||||
qwRelease(refId);
|
||||
|
|
|
@ -102,11 +102,11 @@ typedef struct SSchedulerMgmt {
|
|||
uint64_t taskId; // sequential taksId
|
||||
uint64_t sId; // schedulerId
|
||||
SSchedulerCfg cfg;
|
||||
SRWLatch lock;
|
||||
bool exit;
|
||||
int32_t jobRef;
|
||||
int32_t jobNum;
|
||||
SSchStat stat;
|
||||
SRWLatch hbLock;
|
||||
SHashObj *hbConnections;
|
||||
} SSchedulerMgmt;
|
||||
|
||||
|
@ -320,6 +320,8 @@ extern SSchedulerMgmt schMgmt;
|
|||
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
|
||||
|
||||
|
||||
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
|
||||
void schCleanClusterHb(void* pTrans);
|
||||
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
|
||||
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
|
||||
SSchJob *schAcquireJob(int64_t refId);
|
||||
|
|
|
@ -126,30 +126,6 @@ _return:
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
|
||||
if (!pTask->registerdHb) {
|
||||
return;
|
||||
}
|
||||
|
||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||
SQueryNodeEpId epId = {0};
|
||||
|
||||
epId.nodeId = addr->nodeId;
|
||||
|
||||
SEp* pEp = SCH_GET_CUR_EP(addr);
|
||||
strcpy(epId.ep.fqdn, pEp->fqdn);
|
||||
epId.ep.port = pEp->port;
|
||||
|
||||
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
SCH_TASK_ELOG("nodeId %d fqdn %s port %d not in hb connections", epId.nodeId, epId.ep.fqdn, epId.ep.port);
|
||||
return;
|
||||
}
|
||||
|
||||
atomic_sub_fetch_64(&hb->taskNum, 1);
|
||||
|
||||
pTask->registerdHb = false;
|
||||
}
|
||||
|
||||
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
|
||||
schDeregisterTaskHb(pJob, pTask);
|
||||
|
@ -377,15 +353,21 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
taosHashRemove(pTask->execNodes, &execIdx, sizeof(execIdx));
|
||||
if (taosHashRemove(pTask->execNodes, &execIdx, sizeof(execIdx))) {
|
||||
SCH_TASK_ELOG("fail to remove execIdx %d from execNodeList", execIdx);
|
||||
} else {
|
||||
SCH_TASK_DLOG("execIdx %d removed from execNodeList", execIdx);
|
||||
}
|
||||
|
||||
if (execIdx != pTask->execIdx) { // ignore it
|
||||
SCH_TASK_DLOG("execIdx %d is not current execIdx %d", execIdx, pTask->execIdx);
|
||||
SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) {
|
||||
int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execIdx) {
|
||||
if (taosHashGetSize(pTask->execNodes) <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -393,6 +375,8 @@ int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) {
|
|||
SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execIdx, sizeof(execIdx));
|
||||
nodeInfo->handle = handle;
|
||||
|
||||
SCH_TASK_DLOG("handle updated to %p for execIdx %d", handle, execIdx);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -403,7 +387,7 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v
|
|||
|
||||
SCH_SET_TASK_HANDLE(pTask, handle);
|
||||
|
||||
schUpdateTaskExecNode(pTask, handle, execIdx);
|
||||
schUpdateTaskExecNode(pJob, pTask, handle, execIdx);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -551,6 +535,8 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
|
|||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("set %dth condidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port);
|
||||
|
||||
++addNum;
|
||||
}
|
||||
}
|
||||
|
@ -1110,6 +1096,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
|||
SCH_UNLOCK(SCH_WRITE, &parent->lock);
|
||||
|
||||
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
|
||||
SCH_TASK_DLOG("all %d children task done, start to launch parent task %" PRIx64, readyNum, parent->taskId);
|
||||
SCH_ERR_RET(schLaunchTask(pJob, parent));
|
||||
}
|
||||
}
|
||||
|
@ -1186,7 +1173,7 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
|
|||
nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("task has %d exec address", size);
|
||||
SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1196,7 +1183,8 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
|
|||
}
|
||||
|
||||
SCH_LOCK_TASK(pTask);
|
||||
if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask) {
|
||||
if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) {
|
||||
SCH_TASK_DLOG("task execIdx %d will be rescheduled now", pTask->execIdx);
|
||||
schDropTaskOnExecNode(pJob, pTask);
|
||||
taosHashClear(pTask->execNodes);
|
||||
schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR);
|
||||
|
@ -1306,9 +1294,10 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
|
|||
int32_t code = 0;
|
||||
|
||||
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
|
||||
|
||||
pTask->execIdx++;
|
||||
|
||||
SCH_TASK_DLOG("start to launch task's %dth exec", pTask->execIdx);
|
||||
|
||||
SCH_LOG_TASK_START_TS(pTask);
|
||||
|
||||
if (schJobNeedToStop(pJob, &status)) {
|
||||
|
@ -1471,9 +1460,10 @@ void schFreeJobImpl(void *job) {
|
|||
|
||||
qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob);
|
||||
|
||||
atomic_sub_fetch_32(&schMgmt.jobNum, 1);
|
||||
|
||||
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
|
||||
if (jobNum == 0) {
|
||||
schCloseJobRef();
|
||||
}
|
||||
}
|
||||
|
||||
int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
|
||||
|
|
|
@ -648,31 +648,6 @@ _return:
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
|
||||
int32_t code = 0;
|
||||
SSchHbTrans hb = {0};
|
||||
|
||||
hb.trans.pTrans = pJob->pTrans;
|
||||
|
||||
SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));
|
||||
|
||||
code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
|
||||
if (code) {
|
||||
schFreeRpcCtx(&hb.rpcCtx);
|
||||
|
||||
if (HASH_NODE_EXIST(code)) {
|
||||
*exist = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
|
||||
SCH_ERR_RET(code);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction) {
|
||||
SSchedulerHbReq req = {0};
|
||||
int32_t code = 0;
|
||||
|
@ -684,17 +659,20 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction) {
|
|||
req.sId = schMgmt.sId;
|
||||
memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));
|
||||
|
||||
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
|
||||
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
qError("hb connection no longer exist, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
|
||||
nodeEpId->ep.port);
|
||||
SCH_ERR_RET(code);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &hb->lock);
|
||||
code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
|
||||
memcpy(&trans, &hb->trans, sizeof(trans));
|
||||
SCH_UNLOCK(SCH_WRITE, &hb->lock);
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
|
||||
SCH_ERR_RET(code);
|
||||
|
||||
|
@ -764,60 +742,6 @@ _return:
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
|
||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||
SQueryNodeEpId epId = {0};
|
||||
|
||||
epId.nodeId = addr->nodeId;
|
||||
|
||||
SEp* pEp = SCH_GET_CUR_EP(addr);
|
||||
strcpy(epId.ep.fqdn, pEp->fqdn);
|
||||
epId.ep.port = pEp->port;
|
||||
|
||||
SSchHbTrans *hb = NULL;
|
||||
while (true) {
|
||||
hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
bool exist = false;
|
||||
SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist));
|
||||
if (!exist) {
|
||||
SCH_ERR_RET(schBuildAndSendHbMsg(&epId, NULL));
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
atomic_add_fetch_64(&hb->taskNum, 1);
|
||||
|
||||
pTask->registerdHb = true;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
|
||||
int32_t code = 0;
|
||||
SSchHbTrans *hb = NULL;
|
||||
|
||||
hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &hb->lock);
|
||||
memcpy(&hb->trans, trans, sizeof(*trans));
|
||||
SCH_UNLOCK(SCH_WRITE, &hb->lock);
|
||||
|
||||
qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId,
|
||||
epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
|
||||
SSchedulerHbRsp rsp = {0};
|
||||
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
|
||||
|
@ -1037,6 +961,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
if (NULL == addr) {
|
||||
addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||
isCandidateAddr = true;
|
||||
SCH_TASK_DLOG("target candidateIdx %d", pTask->candidateIdx);
|
||||
}
|
||||
|
||||
SEpSet epSet = addr->epSet;
|
||||
|
|
|
@ -21,17 +21,189 @@
|
|||
#include "tref.h"
|
||||
#include "trpc.h"
|
||||
|
||||
|
||||
void schCleanClusterHb(void* pTrans) {
|
||||
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
|
||||
SSchHbTrans *hb = taosHashIterate(schMgmt.hbConnections, NULL);
|
||||
while (hb) {
|
||||
if (hb->trans.pTrans == pTrans) {
|
||||
SQueryNodeEpId* pEpId = taosHashGetKey(hb, NULL);
|
||||
rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT);
|
||||
taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
|
||||
}
|
||||
|
||||
hb = taosHashIterate(schMgmt.hbConnections, hb);
|
||||
}
|
||||
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
}
|
||||
|
||||
int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId) {
|
||||
return TSDB_CODE_SUCCESS; // TODO ENABLE IT WHEN RPC IS READY
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
SCH_TASK_ELOG("nodeId %d fqdn %s port %d not in hb connections", epId->nodeId, epId->ep.fqdn, epId->ep.port);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int64_t taskNum = atomic_load_64(&hb->taskNum);
|
||||
if (taskNum <= 0) {
|
||||
rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT);
|
||||
taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
|
||||
}
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t schAddHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
|
||||
int32_t code = 0;
|
||||
SSchHbTrans hb = {0};
|
||||
|
||||
hb.trans.pTrans = pJob->pTrans;
|
||||
hb.taskNum = 1;
|
||||
|
||||
SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
|
||||
if (code) {
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
schFreeRpcCtx(&hb.rpcCtx);
|
||||
|
||||
if (HASH_NODE_EXIST(code)) {
|
||||
*exist = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
|
||||
SCH_ERR_RET(code);
|
||||
}
|
||||
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *pEpId) {
|
||||
SSchHbTrans *hb = NULL;
|
||||
|
||||
while (true) {
|
||||
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
|
||||
hb = taosHashGet(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
bool exist = false;
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
SCH_ERR_RET(schAddHbConnection(pJob, pTask, pEpId, &exist));
|
||||
if (!exist) {
|
||||
SCH_RET(schBuildAndSendHbMsg(pEpId, NULL));
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
atomic_add_fetch_64(&hb->taskNum, 1);
|
||||
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
|
||||
if (!pTask->registerdHb) {
|
||||
return;
|
||||
}
|
||||
|
||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||
SQueryNodeEpId epId = {0};
|
||||
|
||||
epId.nodeId = addr->nodeId;
|
||||
|
||||
SEp* pEp = SCH_GET_CUR_EP(addr);
|
||||
strcpy(epId.ep.fqdn, pEp->fqdn);
|
||||
epId.ep.port = pEp->port;
|
||||
|
||||
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
|
||||
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
SCH_TASK_WLOG("nodeId %d fqdn %s port %d not in hb connections", epId.nodeId, epId.ep.fqdn, epId.ep.port);
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t taskNum = atomic_sub_fetch_64(&hb->taskNum, 1);
|
||||
if (0 == taskNum) {
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
schRemoveHbConnection(pJob, pTask, &epId);
|
||||
} else {
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
}
|
||||
|
||||
pTask->registerdHb = false;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
|
||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||
SQueryNodeEpId epId = {0};
|
||||
|
||||
epId.nodeId = addr->nodeId;
|
||||
|
||||
SEp* pEp = SCH_GET_CUR_EP(addr);
|
||||
strcpy(epId.ep.fqdn, pEp->fqdn);
|
||||
epId.ep.port = pEp->port;
|
||||
|
||||
SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId));
|
||||
|
||||
pTask->registerdHb = true;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
|
||||
int32_t code = 0;
|
||||
SSchHbTrans *hb = NULL;
|
||||
|
||||
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
|
||||
hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &hb->lock);
|
||||
memcpy(&hb->trans, trans, sizeof(*trans));
|
||||
SCH_UNLOCK(SCH_WRITE, &hb->lock);
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
|
||||
qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId,
|
||||
epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
void schCloseJobRef(void) {
|
||||
if (!atomic_load_8((int8_t *)&schMgmt.exit)) {
|
||||
return;
|
||||
}
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &schMgmt.lock);
|
||||
if (atomic_load_32(&schMgmt.jobNum) <= 0 && schMgmt.jobRef >= 0) {
|
||||
if (schMgmt.jobRef >= 0) {
|
||||
taosCloseRef(schMgmt.jobRef);
|
||||
schMgmt.jobRef = -1;
|
||||
}
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.lock);
|
||||
}
|
||||
|
||||
uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
|
||||
|
@ -88,4 +260,3 @@ void schFreeRpcCtx(SRpcCtx *pCtx) {
|
|||
(*pCtx->freeFunc)(pCtx->brokenVal.val);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -182,6 +182,14 @@ int32_t scheduleCancelJob(int64_t job) {
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
void schedulerStopQueryHb(void *pTrans) {
|
||||
if (NULL == pTrans) {
|
||||
return;
|
||||
}
|
||||
|
||||
schCleanClusterHb(pTrans);
|
||||
}
|
||||
|
||||
void schedulerFreeJob(int64_t job) {
|
||||
SSchJob *pJob = schAcquireJob(job);
|
||||
if (NULL == pJob) {
|
||||
|
@ -220,6 +228,7 @@ void schedulerDestroy(void) {
|
|||
}
|
||||
}
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
if (schMgmt.hbConnections) {
|
||||
void *pIter = taosHashIterate(schMgmt.hbConnections, NULL);
|
||||
while (pIter != NULL) {
|
||||
|
@ -230,4 +239,5 @@ void schedulerDestroy(void) {
|
|||
taosHashCleanup(schMgmt.hbConnections);
|
||||
schMgmt.hbConnections = NULL;
|
||||
}
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
}
|
||||
|
|
|
@ -180,6 +180,12 @@ static bool addHandleToAcceptloop(void* arg);
|
|||
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
|
||||
return; \
|
||||
} \
|
||||
if (conn->regArg.init) { \
|
||||
tTrace("server conn %p release, notify server app", conn); \
|
||||
STrans* pTransInst = conn->pTransInst; \
|
||||
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \
|
||||
memset(&conn->regArg, 0, sizeof(conn->regArg)); \
|
||||
} \
|
||||
uvStartSendRespInternal(srvMsg); \
|
||||
return; \
|
||||
} \
|
||||
|
@ -1154,6 +1160,10 @@ int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
|
|||
}
|
||||
SExHandle* ex = thandle;
|
||||
SSvrConn* pConn = ex->handle;
|
||||
if (pConn == NULL) {
|
||||
tTrace("invalid handle %p, failed to Get Conn info", thandle);
|
||||
return -1;
|
||||
}
|
||||
|
||||
struct sockaddr_in addr = pConn->addr;
|
||||
pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
|
||||
|
|
|
@ -225,14 +225,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_IN_DNODE, "Vgroup not in dnode")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_EXIST, "VGroup does not exist")
|
||||
|
||||
// mnode-stable
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "Stable already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_NOT_EXIST, "Stable not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC, "Stable confilct with topic")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "STable already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_NOT_EXIST, "STable not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC, "STable confilct with topic")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STBS, "Too many stables")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB, "Invalid stable name")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_OPTION, "Invalid stable options")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_ALTER_OPTION, "Invalid stable alter options")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_OPTION_UNCHNAGED, "Stable option unchanged")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_OPTION_UNCHNAGED, "STable option unchanged")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ROW_BYTES, "Invalid row bytes")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, "Too many tags")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_ALREADY_EXIST, "Tag already exists")
|
||||
|
@ -240,6 +240,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_NOT_EXIST, "Tag does not exist")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_COLUMNS, "Too many columns")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_ALREADY_EXIST, "Column already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC,"Field used by topic")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SINGLE_STB_MODE_DB, "Database is single stable mode")
|
||||
|
||||
// mnode-infoSchema
|
||||
|
|
Loading…
Reference in New Issue