Merge branch '3.0' into szhou/feature/multiwaymerge

This commit is contained in:
shenglian-zhou 2022-06-06 14:39:00 +08:00 committed by GitHub
commit b8508907af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 620 additions and 362 deletions

View File

@ -177,8 +177,8 @@ typedef struct {
typedef struct SField {
char name[TSDB_COL_NAME_LEN];
uint8_t type;
int32_t bytes;
int8_t flags;
int32_t bytes;
} SField;
typedef struct SRetention {
@ -1122,13 +1122,13 @@ typedef struct {
SSchema* pSchemas;
} STableMetaRsp;
typedef struct {
typedef struct {
STableMetaRsp* pMeta;
} SMAlterStbRsp;
int32_t tEncodeSMAlterStbRsp(SEncoder *pEncoder, const SMAlterStbRsp *pRsp);
int32_t tDecodeSMAlterStbRsp(SDecoder *pDecoder, SMAlterStbRsp *pRsp);
void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp);
int32_t tEncodeSMAlterStbRsp(SEncoder* pEncoder, const SMAlterStbRsp* pRsp);
int32_t tDecodeSMAlterStbRsp(SDecoder* pDecoder, SMAlterStbRsp* pRsp);
void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp);
int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
@ -1982,6 +1982,7 @@ typedef struct {
typedef struct {
SClientHbKey connKey;
int64_t clusterId;
SQueryHbReqBasic* query;
SHashObj* info; // hash<Skv.key, Skv>
} SClientHbReq;
@ -2302,23 +2303,23 @@ typedef struct {
} SVgEpSet;
typedef struct {
int8_t version; // for compatibility(default 0)
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
int8_t timezoneInt; // sma data expired if timezone changes.
int32_t dstVgId;
char indexName[TSDB_INDEX_NAME_LEN];
int32_t exprLen;
int32_t tagsFilterLen;
int32_t numOfVgroups;
int64_t indexUid;
tb_uid_t tableUid; // super/child/common table uid
int64_t interval;
int64_t offset; // use unit by precision of DB
int64_t sliding;
char* expr; // sma expression
char* tagsFilter;
SVgEpSet vgEpSet[];
int8_t version; // for compatibility(default 0)
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
int8_t timezoneInt; // sma data expired if timezone changes.
int32_t dstVgId;
char indexName[TSDB_INDEX_NAME_LEN];
int32_t exprLen;
int32_t tagsFilterLen;
int32_t numOfVgroups;
int64_t indexUid;
tb_uid_t tableUid; // super/child/common table uid
int64_t interval;
int64_t offset; // use unit by precision of DB
int64_t sliding;
char* expr; // sma expression
char* tagsFilter;
SVgEpSet* pVgEpSet;
} STSma; // Time-range-wise SMA
typedef STSma SVCreateTSmaReq;
@ -2404,7 +2405,7 @@ static int32_t tDecodeTSmaWrapper(SDecoder* pDecoder, STSmaWrapper* pReq) {
}
typedef struct {
int64_t indexUid;
int64_t indexUid;
STimeWindow queryWindow;
} SVGetTsmaExpWndsReq;

View File

@ -68,6 +68,7 @@ typedef struct SCatalogReq {
SArray *pIndex; // element is index name
SArray *pUser; // element is SUserAuthInfo
bool qNodeRequired; // valid qnode
bool forceUpdate;
} SCatalogReq;
typedef struct SMetaData {
@ -280,7 +281,7 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet);
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId);
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate);
/**

View File

@ -222,7 +222,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|| (_type) == TDMT_VND_DROP_TABLE || (_type) == TDMT_VND_DROP_STB)
#define NEED_SCHEDULER_RETRY_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define REQUEST_MAX_TRY_TIMES 1

View File

@ -104,6 +104,8 @@ void schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub);
void schedulerStopQueryHb(void *pTrans);
/**
* Cancel query job

View File

@ -235,6 +235,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TOO_MANY_COLUMNS TAOS_DEF_ERROR_CODE(0, 0x03AC)
#define TSDB_CODE_MND_COLUMN_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AD)
#define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AE)
#define TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03AF)
#define TSDB_CODE_MND_SINGLE_STB_MODE_DB TAOS_DEF_ERROR_CODE(0, 0x03B0)
// mnode-infoSchema

View File

@ -57,11 +57,6 @@ enum {
typedef struct SAppInstInfo SAppInstInfo;
typedef struct {
void* param;
SClientHbReq* req;
} SHbConnInfo;
typedef struct {
char* key;
// statistics
@ -71,11 +66,8 @@ typedef struct {
int64_t startTime;
// ctl
SRWLatch lock; // lock is used in serialization
// connection
SAppInstInfo* pAppInstInfo;
// info
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* connInfo; // hash<SClientHbKey, SHbConnInfo>
} SAppHbMgr;
typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp);
@ -325,8 +317,6 @@ void appHbMgrCleanup(void);
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
// --- mq
void hbMgrInitMqHbRspHandle();

View File

@ -130,8 +130,13 @@ void destroyTscObj(void *pObj) {
SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
closeAllRequests(pTscObj->pRequests);
schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
if (0 == connNum) {
// TODO
//closeTransporter(pTscObj);
}
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
taosThreadMutexDestroy(&pTscObj->mutex);
taosMemoryFreeClear(pTscObj);
@ -223,6 +228,10 @@ static void doDestroyRequest(void *p) {
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
}
taosMemoryFreeClear(pRequest->msgBuf);
taosMemoryFreeClear(pRequest->sqlstr);
taosMemoryFreeClear(pRequest->pDb);
@ -230,10 +239,6 @@ static void doDestroyRequest(void *p) {
doFreeReqResultInfo(&pRequest->body.resInfo);
qDestroyQueryPlan(pRequest->body.pDag);
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
}
taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->dbList);

View File

@ -129,9 +129,9 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
}
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
if (NULL == info) {
tscWarn("fail to get connInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &pRsp->connKey, sizeof(SClientHbKey));
if (NULL == pReq) {
tscWarn("pReq to get activeInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
pRsp->connKey.connType);
return TSDB_CODE_SUCCESS;
}
@ -181,12 +181,11 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
break;
}
int64_t *clusterId = (int64_t *)info->param;
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
break;
}
@ -199,12 +198,11 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
break;
}
int64_t *clusterId = (int64_t *)info->param;
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
break;
}
@ -217,12 +215,11 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
break;
}
int64_t *clusterId = (int64_t *)info->param;
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
break;
}
@ -547,13 +544,10 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq);
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey));
if (info) {
code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, info->param, pOneReq);
if (code) {
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
continue;
}
code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, &pOneReq->clusterId, pOneReq);
if (code) {
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
continue;
}
//hbClearClientHbReq(pOneReq);
@ -569,23 +563,6 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
return pBatchReq;
}
void hbClearReqInfo(SAppHbMgr *pAppHbMgr) {
void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
while (pIter != NULL) {
SClientHbReq *pOneReq = pIter;
tFreeReqKvHash(pOneReq->info);
taosHashClear(pOneReq->info);
if (pOneReq->query) {
taosArrayDestroy(pOneReq->query->queryDesc);
taosMemoryFreeClear(pOneReq->query);
}
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
}
}
void hbThreadFuncUnexpectedStopped(void) {
atomic_store_8(&clientHbMgr.threadStop, 2);
}
@ -715,14 +692,6 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
}
taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
// init getInfoFunc
pAppHbMgr->connInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
if (pAppHbMgr->connInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pAppHbMgr);
return NULL;
}
taosThreadMutexLock(&clientHbMgr.lock);
taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
@ -745,15 +714,6 @@ void appHbMgrCleanup(void) {
taosHashCleanup(pTarget->activeInfo);
pTarget->activeInfo = NULL;
pIter = taosHashIterate(pTarget->connInfo, NULL);
while (pIter != NULL) {
SHbConnInfo *info = pIter;
taosMemoryFree(info->param);
pIter = taosHashIterate(pTarget->connInfo, pIter);
}
taosHashCleanup(pTarget->connInfo);
pTarget->connInfo = NULL;
taosMemoryFree(pTarget->key);
taosMemoryFree(pTarget);
}
@ -791,7 +751,7 @@ void hbMgrCleanUp() {
clientHbMgr.appHbMgrs = NULL;
}
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) {
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clusterId) {
// init hash in activeinfo
void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
if (data != NULL) {
@ -799,17 +759,11 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
}
SClientHbReq hbReq = {0};
hbReq.connKey = connKey;
hbReq.clusterId = clusterId;
//hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
// init hash
if (info != NULL) {
SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
info->req = pReq;
taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo));
}
atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
return 0;
}
@ -819,15 +773,10 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in
.tscRid = tscRefId,
.connType = connType,
};
SHbConnInfo info = {0};
switch (connType) {
case CONN_TYPE__QUERY: {
int64_t *pClusterId = taosMemoryMalloc(sizeof(int64_t));
*pClusterId = clusterId;
info.param = pClusterId;
return hbRegisterConnImpl(pAppHbMgr, connKey, &info);
return hbRegisterConnImpl(pAppHbMgr, connKey, clusterId);
}
case CONN_TYPE__TMQ: {
return 0;
@ -844,26 +793,10 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
}
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
if (info) {
taosMemoryFree(info->param);
taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
}
if (NULL == pReq || NULL == info) {
if (NULL == pReq) {
return;
}
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
}
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *key, void *value, int32_t keyLen,
int32_t valueLen) {
// find req by connection id
SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
ASSERT(pReq != NULL);
taosHashPut(pReq->info, key, keyLen, value, valueLen);
return 0;
}

View File

@ -180,7 +180,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
taosMemoryFreeClear(output.dbVgroup);
tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr());
} else if (output.dbVgroup) {
} else if (output.dbVgroup && output.dbVgroup->vgHash) {
struct SCatalog* pCatalog = NULL;
int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);

View File

@ -2332,6 +2332,8 @@ static int32_t isSchemalessDb(SSmlHandle* info){
smlBuildInvalidDataMsg(&info->msgBuf, "catalogGetDBCfg error, code:", tstrerror(code));
return code;
}
taosArrayDestroy(pInfo.pRetensions);
if (!pInfo.schemaless){
info->pRequest->code = TSDB_CODE_SML_INVALID_DB_CONF;
smlBuildInvalidDataMsg(&info->msgBuf, "can not insert into schemaless db:", dbFname);

View File

@ -694,7 +694,6 @@ void tFreeSMAltertbReq(SMAlterStbReq *pReq) {
pReq->pFields = NULL;
}
int32_t tSerializeSEpSet(void *buf, int32_t bufLen, const SEpSet *pEpset) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -3674,12 +3673,12 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1;
}
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
if (tEncodeI32(pCoder, pSma->vgEpSet[v].vgId) < 0) return -1;
if (tEncodeI8(pCoder, pSma->vgEpSet[v].epSet.inUse) < 0) return -1;
int8_t numOfEps = pSma->vgEpSet[v].epSet.numOfEps;
if (tEncodeI32(pCoder, pSma->pVgEpSet[v].vgId) < 0) return -1;
if (tEncodeI8(pCoder, pSma->pVgEpSet[v].epSet.inUse) < 0) return -1;
int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps;
if (tEncodeI8(pCoder, numOfEps) < 0) return -1;
for (int32_t n = 0; n < numOfEps; ++n) {
const SEp *pEp = &pSma->vgEpSet[v].epSet.eps[n];
const SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n];
if (tEncodeCStr(pCoder, pEp->fqdn) < 0) return -1;
if (tEncodeU16(pCoder, pEp->port) < 0) return -1;
}
@ -3712,15 +3711,25 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
} else {
pSma->tagsFilter = NULL;
}
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
if (tDecodeI32(pCoder, &pSma->vgEpSet[v].vgId) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->vgEpSet[v].epSet.inUse) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->vgEpSet[v].epSet.numOfEps) < 0) return -1;
int8_t numOfEps = pSma->vgEpSet[v].epSet.numOfEps;
for (int32_t n = 0; n < numOfEps; ++n) {
SEp *pEp = &pSma->vgEpSet[v].epSet.eps[n];
if (tDecodeCStrTo(pCoder, pEp->fqdn) < 0) return -1;
if (tDecodeU16(pCoder, &pEp->port) < 0) return -1;
if (pSma->numOfVgroups > 0) {
pSma->pVgEpSet = (SVgEpSet *)tDecoderMalloc(pCoder, pSma->numOfVgroups * sizeof(SVgEpSet));
if (!pSma->pVgEpSet) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memset(pSma->pVgEpSet, 0, pSma->numOfVgroups * sizeof(SVgEpSet));
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
if (tDecodeI32(pCoder, &pSma->pVgEpSet[v].vgId) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->pVgEpSet[v].epSet.inUse) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->pVgEpSet[v].epSet.numOfEps) < 0) return -1;
int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps;
for (int32_t n = 0; n < numOfEps; ++n) {
SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n];
if (tDecodeCStrTo(pCoder, pEp->fqdn) < 0) return -1;
if (tDecodeU16(pCoder, &pEp->port) < 0) return -1;
}
}
}
@ -3765,7 +3774,7 @@ int32_t tDecodeSVDropTSmaReq(SDecoder *pCoder, SVDropTSmaReq *pReq) {
return 0;
}
int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder* pCoder, const SVGetTsmaExpWndsReq* pReq) {
int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder *pCoder, const SVGetTsmaExpWndsReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
@ -3773,10 +3782,10 @@ int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder* pCoder, const SVGetTsmaExpWndsReq*
if (tEncodeI64(pCoder, pReq->queryWindow.ekey) < 0) return -1;
tEndEncode(pCoder);
return 0;
return 0;
}
int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder* pCoder, SVGetTsmaExpWndsReq* pReq) {
int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder *pCoder, SVGetTsmaExpWndsReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1;
@ -3787,7 +3796,7 @@ int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder* pCoder, SVGetTsmaExpWndsReq* pReq)
return 0;
}
int32_t tEncodeSVGetTSmaExpWndsRsp(SEncoder* pCoder, const SVGetTsmaExpWndsRsp* pReq) {
int32_t tEncodeSVGetTSmaExpWndsRsp(SEncoder *pCoder, const SVGetTsmaExpWndsRsp *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
@ -3814,7 +3823,7 @@ int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder *pCoder, SVGetTsmaExpWndsRsp *pReq)
return 0;
}
int32_t tEncodeSVDeleteReq(SEncoder* pCoder, const SVDeleteReq* pReq) {
int32_t tEncodeSVDeleteReq(SEncoder *pCoder, const SVDeleteReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->delUid) < 0) return -1;
@ -3832,7 +3841,7 @@ int32_t tEncodeSVDeleteReq(SEncoder* pCoder, const SVDeleteReq* pReq) {
return 0;
}
int32_t tDecodeSVDeleteReq(SDecoder* pCoder, SVDeleteReq* pReq) {
int32_t tDecodeSVDeleteReq(SDecoder *pCoder, SVDeleteReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->delUid) < 0) return -1;
@ -3850,7 +3859,7 @@ int32_t tDecodeSVDeleteReq(SDecoder* pCoder, SVDeleteReq* pReq) {
return 0;
}
int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq) {
int32_t tEncodeSVDeleteRsp(SEncoder *pCoder, const SVDeleteRsp *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32(pCoder, pReq->code) < 0) return -1;
@ -3860,7 +3869,7 @@ int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq) {
return 0;
}
int32_t tDecodeSVDeleteRsp(SDecoder* pCoder, SVDeleteRsp* pReq) {
int32_t tDecodeSVDeleteRsp(SDecoder *pCoder, SVDeleteRsp *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->code) < 0) return -1;
@ -4502,7 +4511,7 @@ int32_t tDecodeSVAlterTbRsp(SDecoder *pDecoder, SVAlterTbRsp *pRsp) {
}
int32_t tDeserializeSVAlterTbRsp(void *buf, int32_t bufLen, SVAlterTbRsp *pRsp) {
int32_t meta = 0;
int32_t meta = 0;
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
@ -4543,7 +4552,7 @@ int32_t tDecodeSMAlterStbRsp(SDecoder *pDecoder, SMAlterStbRsp *pRsp) {
}
int32_t tDeserializeSMAlterStbRsp(void *buf, int32_t bufLen, SMAlterStbRsp *pRsp) {
int32_t meta = 0;
int32_t meta = 0;
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
@ -4559,7 +4568,7 @@ int32_t tDeserializeSMAlterStbRsp(void *buf, int32_t bufLen, SMAlterStbRsp *pRsp
return 0;
}
void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp) {
void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) {
if (NULL == pRsp) {
return;
}
@ -4569,6 +4578,3 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp) {
taosMemoryFree(pRsp->pMeta);
}
}

View File

@ -298,28 +298,30 @@ typedef struct {
} SVgObj;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createdTime;
int64_t uid;
int64_t stbUid;
int64_t dbUid;
int8_t intervalUnit;
int8_t slidingUnit;
int8_t timezone;
int32_t dstVgId; // for stream
int64_t interval;
int64_t offset;
int64_t sliding;
int32_t exprLen; // strlen + 1
int32_t tagsFilterLen;
int32_t sqlLen;
int32_t astLen;
char* expr;
char* tagsFilter;
char* sql;
char* ast;
char name[TSDB_TABLE_FNAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createdTime;
int64_t uid;
int64_t stbUid;
int64_t dbUid;
int8_t intervalUnit;
int8_t slidingUnit;
int8_t timezone;
int32_t dstVgId; // for stream
int64_t interval;
int64_t offset;
int64_t sliding;
int32_t exprLen; // strlen + 1
int32_t tagsFilterLen;
int32_t sqlLen;
int32_t astLen;
int32_t numOfVgroups;
char* expr;
char* tagsFilter;
char* sql;
char* ast;
SVgEpSet* pVgEpSet;
} SSmaObj;
typedef struct {

View File

@ -37,7 +37,7 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
bool mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, const SArray *colIds);
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId);
#ifdef __cplusplus
}

View File

@ -36,6 +36,7 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw);
static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma);
static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups);
static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq);
static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq);
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
@ -262,7 +263,9 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm
req.sliding = pSma->sliding;
req.expr = pSma->expr;
req.tagsFilter = pSma->tagsFilter;
req.numOfVgroups = pSma->numOfVgroups;
req.pVgEpSet = pSma->pVgEpSet;
// get length
int32_t ret = 0;
tEncodeSize(tEncodeSVCreateTSmaReq, &req, contLen, ret);
@ -420,6 +423,15 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
mndReleaseDnode(pMnode, pDnode);
// todo add sma info here
SVgEpSet *pVgEpSet = NULL;
int32_t numOfVgroups = 0;
if (mndSmaGetVgEpSet(pMnode, pDb, &pVgEpSet, &numOfVgroups) != 0) {
return -1;
}
pSma->pVgEpSet = pVgEpSet;
pSma->numOfVgroups = numOfVgroups;
int32_t smaContLen = 0;
void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
if (pSmaReq == NULL) return -1;
@ -964,3 +976,52 @@ static void mndCancelGetNextSma(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
SVgEpSet *pVgEpSet = NULL;
int32_t nAllocVgs = 16;
int32_t nVgs = 0;
pVgEpSet = taosMemoryCalloc(nAllocVgs, sizeof(SVgEpSet));
if (!pVgEpSet) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) {
sdbRelease(pSdb, pVgroup);
continue;
}
if (nVgs >= nAllocVgs) {
void *p = taosMemoryRealloc(pVgEpSet, nAllocVgs * 2 * sizeof(SVgEpSet));
if (!p) {
taosMemoryFree(pVgEpSet);
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pVgEpSet = (SVgEpSet *)p;
nAllocVgs *= 2;
}
(pVgEpSet + nVgs)->vgId = pVgroup->vgId;
(pVgEpSet + nVgs)->epSet = mndGetVgroupEpset(pMnode, pVgroup);
++nVgs;
sdbRelease(pSdb, pVgroup);
}
*ppVgEpSet = pVgEpSet;
*numOfVgroups = nVgs;
return 0;
}

View File

@ -23,6 +23,7 @@
#include "mndPerfSchema.h"
#include "mndScheduler.h"
#include "mndShow.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
@ -394,14 +395,14 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req.pRSmaParam.xFilesFactor = pStb->xFilesFactor;
req.pRSmaParam.delay = pStb->delay;
if (pStb->ast1Len > 0) {
if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, req.pRSmaParam.xFilesFactor) !=
TSDB_CODE_SUCCESS) {
if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len,
req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
return NULL;
}
}
if (pStb->ast2Len > 0) {
if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, req.pRSmaParam.xFilesFactor) !=
TSDB_CODE_SUCCESS) {
if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len,
req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
return NULL;
}
}
@ -949,13 +950,18 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
return 0;
}
static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
static int32_t mndDropSuperTableTag(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
int32_t tag = mndFindSuperTableTagIndex(pOld, tagName);
if (tag < 0) {
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
return -1;
}
col_id_t colId = pOld->pTags[tag].colId;
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
return -1;
}
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
}
@ -968,7 +974,7 @@ static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const ch
return 0;
}
static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pFields) {
static int32_t mndAlterStbTagName(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, SArray *pFields) {
if ((int32_t)taosArrayGetSize(pFields) != 2) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
@ -986,6 +992,11 @@ static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pF
return -1;
}
col_id_t colId = pOld->pTags[tag].colId;
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
return -1;
}
if (mndFindSuperTableTagIndex(pOld, newTagName) >= 0) {
terrno = TSDB_CODE_MND_TAG_ALREADY_EXIST;
return -1;
@ -1008,13 +1019,18 @@ static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pF
return 0;
}
static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
int32_t tag = mndFindSuperTableTagIndex(pOld, pField->name);
if (tag < 0) {
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
return -1;
}
col_id_t colId = pOld->pTags[tag].colId;
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
return -1;
}
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
}
@ -1075,7 +1091,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
return 0;
}
static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const char *colName) {
static int32_t mndDropSuperTableColumn(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *colName) {
int32_t col = mndFindSuperTableColumnIndex(pOld, colName);
if (col < 0) {
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
@ -1092,6 +1108,11 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const
return -1;
}
col_id_t colId = pOld->pTags[col].colId;
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
return -1;
}
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
}
@ -1104,7 +1125,7 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const
return 0;
}
static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
int32_t col = mndFindSuperTableColumnIndex(pOld, pField->name);
if (col < 0) {
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
@ -1121,6 +1142,11 @@ static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const
return -1;
}
col_id_t colId = pOld->pTags[col].colId;
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
return -1;
}
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
}
@ -1199,7 +1225,6 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0;
}
static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableMetaRsp *pRsp) {
taosRLockLatch(&pStb->lock);
@ -1269,13 +1294,13 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char
return code;
}
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont, int32_t *pLen) {
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont,
int32_t *pLen) {
int ret;
SEncoder ec = {0};
uint32_t contLen = 0;
uint32_t contLen = 0;
SMAlterStbRsp alterRsp = {0};
SName name = {0};
SName name = {0};
tNameFromString(&name, pAlter->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
alterRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
@ -1283,20 +1308,20 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, S
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
ret = mndBuildStbSchemaImp(pDb, pObj, name.tname, alterRsp.pMeta);
if (ret) {
tFreeSMAlterStbRsp(&alterRsp);
return ret;
}
tEncodeSize(tEncodeSMAlterStbRsp, &alterRsp, contLen, ret);
if (ret) {
tFreeSMAlterStbRsp(&alterRsp);
return ret;
}
void* cont = taosMemoryMalloc(contLen);
void *cont = taosMemoryMalloc(contLen);
tEncoderInit(&ec, cont, contLen);
tEncodeSMAlterStbRsp(&ec, &alterRsp);
tEncoderClear(&ec);
@ -1305,24 +1330,24 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, S
*pCont = cont;
*pLen = contLen;
return 0;
}
static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
bool needRsp = true;
bool needRsp = true;
int32_t code = -1;
STrans *pTrans = NULL;
SField *pField0 = NULL;
SStbObj stbObj = {0};
taosRLockLatch(&pOld->lock);
memcpy(&stbObj, pOld, sizeof(SStbObj));
taosRUnLockLatch(&pOld->lock);
stbObj.pColumns = NULL;
stbObj.pTags = NULL;
stbObj.updateTime = taosGetTimestampMs();
stbObj.lock = 0;
taosRUnLockLatch(&pOld->lock);
int32_t code = -1;
STrans *pTrans = NULL;
SField *pField0 = NULL;
switch (pAlter->alterType) {
case TSDB_ALTER_TABLE_ADD_TAG:
@ -1330,25 +1355,25 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
break;
case TSDB_ALTER_TABLE_DROP_TAG:
pField0 = taosArrayGet(pAlter->pFields, 0);
code = mndDropSuperTableTag(pOld, &stbObj, pField0->name);
code = mndDropSuperTableTag(pMnode, pOld, &stbObj, pField0->name);
break;
case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
code = mndAlterStbTagName(pOld, &stbObj, pAlter->pFields);
code = mndAlterStbTagName(pMnode, pOld, &stbObj, pAlter->pFields);
break;
case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
pField0 = taosArrayGet(pAlter->pFields, 0);
code = mndAlterStbTagBytes(pOld, &stbObj, pField0);
code = mndAlterStbTagBytes(pMnode, pOld, &stbObj, pField0);
break;
case TSDB_ALTER_TABLE_ADD_COLUMN:
code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
break;
case TSDB_ALTER_TABLE_DROP_COLUMN:
pField0 = taosArrayGet(pAlter->pFields, 0);
code = mndDropSuperTableColumn(pOld, &stbObj, pField0->name);
code = mndDropSuperTableColumn(pMnode, pOld, &stbObj, pField0->name);
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
pField0 = taosArrayGet(pAlter->pFields, 0);
code = mndAlterStbColumnBytes(pOld, &stbObj, pField0);
code = mndAlterStbColumnBytes(pMnode, pOld, &stbObj, pField0);
break;
case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
needRsp = false;
@ -1370,12 +1395,12 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
mndTransSetDbName(pTrans, pDb->name);
if (needRsp) {
void* pCont = NULL;
void *pCont = NULL;
int32_t contLen = 0;
if (mndBuildSMAlterStbRsp(pDb, pAlter, &stbObj, &pCont, &contLen)) goto _OVER;
if (mndBuildSMAlterStbRsp(pDb, pAlter, &stbObj, &pCont, &contLen) != 0) goto _OVER;
mndTransSetRpcRsp(pTrans, pCont, contLen);
}
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;

View File

@ -71,7 +71,7 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
return strchr(topic, '.') + 1;
}
bool mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, const SArray *colAndTagIds) {
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
bool found = false;
@ -105,20 +105,21 @@ bool mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, const SArray *col
}
}
for (int32_t i = 0; i < taosArrayGetSize(colAndTagIds); i++) {
int16_t *pColId = taosArrayGet(colAndTagIds, i);
if (taosHashGet(pColHash, pColId, sizeof(int16_t)) != NULL) {
found = true;
goto NEXT;
}
if (taosHashGet(pColHash, &colId, sizeof(int16_t)) != NULL) {
found = true;
goto NEXT;
}
NEXT:
sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst);
if (found) return false;
if (found) {
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
return -1;
}
}
return true;
return 0;
}
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {

View File

@ -103,7 +103,7 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
tIndexJsonPut(pMeta->pTagIvtIdx, terms, tuid);
indexMultiTermDestroy(terms);
#endif
return -1;
return 0;
}
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {

View File

@ -52,6 +52,7 @@ enum {
CTG_OP_UPDATE_VGROUP = 0,
CTG_OP_UPDATE_TB_META,
CTG_OP_DROP_DB_CACHE,
CTG_OP_DROP_DB_VGROUP,
CTG_OP_DROP_STB_META,
CTG_OP_DROP_TB_META,
CTG_OP_UPDATE_USER,
@ -266,26 +267,32 @@ typedef struct SCtgUpdateTblMsg {
STableMetaOutput* output;
} SCtgUpdateTblMsg;
typedef struct SCtgRemoveDBMsg {
typedef struct SCtgDropDBMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
uint64_t dbId;
} SCtgRemoveDBMsg;
} SCtgDropDBMsg;
typedef struct SCtgRemoveStbMsg {
typedef struct SCtgDropDbVgroupMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
} SCtgDropDbVgroupMsg;
typedef struct SCtgDropStbMetaMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
uint64_t dbId;
uint64_t suid;
} SCtgRemoveStbMsg;
} SCtgDropStbMetaMsg;
typedef struct SCtgRemoveTblMsg {
typedef struct SCtgDropTblMetaMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
uint64_t dbId;
} SCtgRemoveTblMsg;
} SCtgDropTblMetaMsg;
typedef struct SCtgUpdateUserMsg {
SCatalog* pCtg;
@ -451,6 +458,7 @@ int32_t ctgGetTbMetaFromCache(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTabl
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action);
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action);
int32_t ctgOpDropDbCache(SCtgCacheOperation *action);
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *action);
int32_t ctgOpDropStbMeta(SCtgCacheOperation *action);
int32_t ctgOpDropTbMeta(SCtgCacheOperation *action);
int32_t ctgOpUpdateUser(SCtgCacheOperation *action);
@ -464,6 +472,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName);
int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass);
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId);
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncReq);
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq);
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq);
int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq);

View File

@ -286,6 +286,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
int32_t taskIdx = 0;
for (int32_t i = 0; i < dbVgNum; ++i) {
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
if (pReq->forceUpdate) {
ctgDropDbVgroupEnqueue(pCtg, dbFName, true);
}
CTG_ERR_JRET(ctgInitGetDbVgTask(pJob, taskIdx++, dbFName));
}
@ -301,6 +304,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
for (int32_t i = 0; i < tbMetaNum; ++i) {
SName* name = taosArrayGet(pReq->pTableMeta, i);
if (pReq->forceUpdate) {
catalogRemoveTableMeta(pCtg, name);
}
CTG_ERR_JRET(ctgInitGetTbMetaTask(pJob, taskIdx++, name));
}

View File

@ -35,6 +35,11 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {
"drop DB",
ctgOpDropDbCache
},
{
CTG_OP_DROP_DB_VGROUP,
"drop DBVgroup",
ctgOpDropDbVgroup
},
{
CTG_OP_DROP_STB_META,
"drop stbMeta",
@ -563,9 +568,9 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_CACHE};
SCtgRemoveDBMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveDBMsg));
SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg));
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
@ -590,13 +595,43 @@ _return:
CTG_RET(code);
}
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_VGROUP, .syncOp = syncOp};
SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
char *p = strchr(dbFName, '.');
if (p && CTG_IS_SYS_DBNAME(p + 1)) {
dbFName = p + 1;
}
msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
action.data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action));
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(action.data);
CTG_RET(code);
}
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_STB_META, .syncOp = syncOp};
SCtgRemoveStbMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveStbMsg));
SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg));
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
@ -623,9 +658,9 @@ _return:
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_TB_META, .syncOp = syncOp};
SCtgRemoveTblMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveTblMsg));
SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveTblMsg));
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
@ -1281,7 +1316,7 @@ _return:
int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgRemoveDBMsg *msg = operation->data;
SCtgDropDBMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
@ -1304,6 +1339,33 @@ _return:
CTG_RET(code);
}
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgDropDbVgroupMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
if (NULL == dbCache) {
goto _return;
}
CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache));
ctgFreeVgInfo(dbCache->vgInfo);
dbCache->vgInfo = NULL;
ctgDebug("db vgInfo removed, dbFName:%s", msg->dbFName);
ctgWReleaseVgInfo(dbCache);
_return:
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
@ -1353,7 +1415,7 @@ _return:
int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgRemoveStbMsg *msg = operation->data;
SCtgDropStbMetaMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
@ -1399,7 +1461,7 @@ _return:
int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgRemoveTblMsg *msg = operation->data;
SCtgDropTblMetaMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;

View File

@ -132,7 +132,22 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
}
}
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId) {
/*
prepare SQL:
create database db1;
use db1;
create stable st1 (ts timestamp, f1 int) tags(t1 int);
create table tb1 using st1 tags(1);
insert into tb1 values (now, 1);
create qnode on dnode 1;
create user user1 pass "abc";
create database db2;
grant write on db2.* to user1;
create function udf1 as '/tmp/libudf1.so' outputtype int;
create aggregate function udf2 as '/tmp/libudf2.so' outputtype int;
*/
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate) {
int32_t code = 0;
SCatalogReq req = {0};
req.pTableMeta = taosArrayInit(2, sizeof(SName));
@ -144,6 +159,7 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
req.pIndex = NULL;//taosArrayInit(2, TSDB_INDEX_FNAME_LEN);
req.pUser = taosArrayInit(2, sizeof(SUserAuthInfo));
req.qNodeRequired = true;
req.forceUpdate = forceUpdate;
SName name = {0};
char dbFName[TSDB_DB_FNAME_LEN] = {0};

View File

@ -38,7 +38,7 @@ typedef struct SIFParam {
col_id_t colId;
int64_t suid; // add later
char dbName[TSDB_DB_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
char colName[TSDB_COL_NAME_LEN * 2 + 4];
SIndexMetaArg arg;
} SIFParam;

View File

@ -34,6 +34,7 @@ extern "C" {
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
#define QW_DEFAULT_SHORT_RUN_TIMES 2
#define QW_DEFAULT_HEARTBEAT_MSEC 5000
#define QW_SCH_TIMEOUT_MSEC 180000
enum {
QW_PHASE_PRE_QUERY = 1,
@ -137,7 +138,7 @@ typedef struct SQWTaskCtx {
} SQWTaskCtx;
typedef struct SQWSchStatus {
int32_t lastAccessTs; // timestamp in second
int64_t hbBrokenTs; // timestamp in msecond
SRWLatch hbConnLock;
SRpcHandleInfo hbConnInfo;
SQueryNodeEpId hbEpId;
@ -354,6 +355,8 @@ int32_t qwOpenRef(void);
void qwSetHbParam(int64_t refId, SQWHbParam **pParam);
int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type);
void qwClearExpiredSch(SArray* pExpiredSch);
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
void qwDbgDumpMgmtInfo(SQWorker *mgmt);
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);

View File

@ -535,3 +535,9 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
return -1;
}
void qwClearExpiredSch(SArray* pExpiredSch) {
}

View File

@ -21,10 +21,12 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
SSchedulerHbRsp rsp = {0};
SQWSchStatus *sch = NULL;
QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
QW_ERR_RET(qwAcquireScheduler(mgmt, req->sId, QW_READ, &sch));
QW_LOCK(QW_WRITE, &sch->hbConnLock);
sch->hbBrokenTs = taosGetTimestampMs();
if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
sch->hbConnInfo.handle = NULL;
@ -794,6 +796,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
SQWSchStatus *sch = NULL;
int32_t taskNum = 0;
SQWHbInfo *rspList = NULL;
SArray *pExpiredSch = NULL;
int32_t code = 0;
qwDbgDumpMgmtInfo(mgmt);
@ -809,8 +812,11 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
}
rspList = taosMemoryCalloc(schNum, sizeof(SQWHbInfo));
if (NULL == rspList) {
pExpiredSch = taosArrayInit(schNum, sizeof(uint64_t));
if (NULL == rspList || NULL == pExpiredSch) {
QW_UNLOCK(QW_READ, &mgmt->schLock);
taosMemoryFree(rspList);
taosArrayDestroy(pExpiredSch);
QW_ELOG("calloc %d SQWHbInfo failed", schNum);
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
qwRelease(refId);
@ -820,6 +826,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
void *key = NULL;
size_t keyLen = 0;
int32_t i = 0;
int64_t currentMs = taosGetTimestampMs();
void *pIter = taosHashIterate(mgmt->schHash, NULL);
while (pIter) {
@ -827,6 +834,11 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
if (NULL == sch->hbConnInfo.handle) {
uint64_t *sId = taosHashGetKey(pIter, NULL);
QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
if (sch->hbBrokenTs > 0 && ((currentMs - sch->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && taosHashGetSize(sch->tasksHash) <= 0) {
taosArrayPush(pExpiredSch, sId);
}
pIter = taosHashIterate(mgmt->schHash, pIter);
continue;
}
@ -852,7 +864,12 @@ _return:
tFreeSSchedulerHbRsp(&rspList[j].rsp);
}
if (taosArrayGetSize(pExpiredSch) > 0) {
qwClearExpiredSch(pExpiredSch);
}
taosMemoryFreeClear(rspList);
taosArrayDestroy(pExpiredSch);
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
qwRelease(refId);

View File

@ -102,11 +102,11 @@ typedef struct SSchedulerMgmt {
uint64_t taskId; // sequential taksId
uint64_t sId; // schedulerId
SSchedulerCfg cfg;
SRWLatch lock;
bool exit;
int32_t jobRef;
int32_t jobNum;
SSchStat stat;
SRWLatch hbLock;
SHashObj *hbConnections;
} SSchedulerMgmt;
@ -320,6 +320,8 @@ extern SSchedulerMgmt schMgmt;
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
void schCleanClusterHb(void* pTrans);
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
SSchJob *schAcquireJob(int64_t refId);

View File

@ -126,30 +126,6 @@ _return:
SCH_RET(code);
}
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
if (!pTask->registerdHb) {
return;
}
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
SEp* pEp = SCH_GET_CUR_EP(addr);
strcpy(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port;
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
SCH_TASK_ELOG("nodeId %d fqdn %s port %d not in hb connections", epId.nodeId, epId.ep.fqdn, epId.ep.port);
return;
}
atomic_sub_fetch_64(&hb->taskNum, 1);
pTask->registerdHb = false;
}
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
schDeregisterTaskHb(pJob, pTask);
@ -377,15 +353,21 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_
return TSDB_CODE_SUCCESS;
}
taosHashRemove(pTask->execNodes, &execIdx, sizeof(execIdx));
if (taosHashRemove(pTask->execNodes, &execIdx, sizeof(execIdx))) {
SCH_TASK_ELOG("fail to remove execIdx %d from execNodeList", execIdx);
} else {
SCH_TASK_DLOG("execIdx %d removed from execNodeList", execIdx);
}
if (execIdx != pTask->execIdx) { // ignore it
SCH_TASK_DLOG("execIdx %d is not current execIdx %d", execIdx, pTask->execIdx);
SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) {
int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execIdx) {
if (taosHashGetSize(pTask->execNodes) <= 0) {
return TSDB_CODE_SUCCESS;
}
@ -393,6 +375,8 @@ int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) {
SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execIdx, sizeof(execIdx));
nodeInfo->handle = handle;
SCH_TASK_DLOG("handle updated to %p for execIdx %d", handle, execIdx);
return TSDB_CODE_SUCCESS;
}
@ -403,7 +387,7 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v
SCH_SET_TASK_HANDLE(pTask, handle);
schUpdateTaskExecNode(pTask, handle, execIdx);
schUpdateTaskExecNode(pJob, pTask, handle, execIdx);
return TSDB_CODE_SUCCESS;
}
@ -551,6 +535,8 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("set %dth condidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port);
++addNum;
}
}
@ -1110,6 +1096,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SCH_UNLOCK(SCH_WRITE, &parent->lock);
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
SCH_TASK_DLOG("all %d children task done, start to launch parent task %" PRIx64, readyNum, parent->taskId);
SCH_ERR_RET(schLaunchTask(pJob, parent));
}
}
@ -1186,7 +1173,7 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
}
SCH_TASK_DLOG("task has %d exec address", size);
SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
}
@ -1196,7 +1183,8 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
}
SCH_LOCK_TASK(pTask);
if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask) {
if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) {
SCH_TASK_DLOG("task execIdx %d will be rescheduled now", pTask->execIdx);
schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes);
schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR);
@ -1306,9 +1294,10 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
int32_t code = 0;
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
pTask->execIdx++;
SCH_TASK_DLOG("start to launch task's %dth exec", pTask->execIdx);
SCH_LOG_TASK_START_TS(pTask);
if (schJobNeedToStop(pJob, &status)) {
@ -1471,9 +1460,10 @@ void schFreeJobImpl(void *job) {
qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob);
atomic_sub_fetch_32(&schMgmt.jobNum, 1);
schCloseJobRef();
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
if (jobNum == 0) {
schCloseJobRef();
}
}
int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,

View File

@ -648,31 +648,6 @@ _return:
SCH_RET(code);
}
int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
int32_t code = 0;
SSchHbTrans hb = {0};
hb.trans.pTrans = pJob->pTrans;
SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));
code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
if (code) {
schFreeRpcCtx(&hb.rpcCtx);
if (HASH_NODE_EXIST(code)) {
*exist = true;
return TSDB_CODE_SUCCESS;
}
qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
SCH_ERR_RET(code);
}
return TSDB_CODE_SUCCESS;
}
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction) {
SSchedulerHbReq req = {0};
int32_t code = 0;
@ -684,17 +659,20 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction) {
req.sId = schMgmt.sId;
memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
qError("hb connection no longer exist, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
nodeEpId->ep.port);
SCH_ERR_RET(code);
return TSDB_CODE_SUCCESS;
}
SCH_LOCK(SCH_WRITE, &hb->lock);
code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
memcpy(&trans, &hb->trans, sizeof(trans));
SCH_UNLOCK(SCH_WRITE, &hb->lock);
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
SCH_ERR_RET(code);
@ -764,60 +742,6 @@ _return:
SCH_RET(code);
}
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
SEp* pEp = SCH_GET_CUR_EP(addr);
strcpy(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port;
SSchHbTrans *hb = NULL;
while (true) {
hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
bool exist = false;
SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist));
if (!exist) {
SCH_ERR_RET(schBuildAndSendHbMsg(&epId, NULL));
}
continue;
}
break;
}
atomic_add_fetch_64(&hb->taskNum, 1);
pTask->registerdHb = true;
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
int32_t code = 0;
SSchHbTrans *hb = NULL;
hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
SCH_LOCK(SCH_WRITE, &hb->lock);
memcpy(&hb->trans, trans, sizeof(*trans));
SCH_UNLOCK(SCH_WRITE, &hb->lock);
qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId,
epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle);
return TSDB_CODE_SUCCESS;
}
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
SSchedulerHbRsp rsp = {0};
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
@ -1037,6 +961,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
if (NULL == addr) {
addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
isCandidateAddr = true;
SCH_TASK_DLOG("target candidateIdx %d", pTask->candidateIdx);
}
SEpSet epSet = addr->epSet;

View File

@ -21,17 +21,189 @@
#include "tref.h"
#include "trpc.h"
void schCleanClusterHb(void* pTrans) {
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
SSchHbTrans *hb = taosHashIterate(schMgmt.hbConnections, NULL);
while (hb) {
if (hb->trans.pTrans == pTrans) {
SQueryNodeEpId* pEpId = taosHashGetKey(hb, NULL);
rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT);
taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
}
hb = taosHashIterate(schMgmt.hbConnections, hb);
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
}
int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId) {
return TSDB_CODE_SUCCESS; // TODO ENABLE IT WHEN RPC IS READY
int32_t code = 0;
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
SCH_TASK_ELOG("nodeId %d fqdn %s port %d not in hb connections", epId->nodeId, epId->ep.fqdn, epId->ep.port);
return TSDB_CODE_SUCCESS;
}
int64_t taskNum = atomic_load_64(&hb->taskNum);
if (taskNum <= 0) {
rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT);
taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
return TSDB_CODE_SUCCESS;
}
int32_t schAddHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
int32_t code = 0;
SSchHbTrans hb = {0};
hb.trans.pTrans = pJob->pTrans;
hb.taskNum = 1;
SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
if (code) {
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
schFreeRpcCtx(&hb.rpcCtx);
if (HASH_NODE_EXIST(code)) {
*exist = true;
return TSDB_CODE_SUCCESS;
}
qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
SCH_ERR_RET(code);
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
return TSDB_CODE_SUCCESS;
}
int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *pEpId) {
SSchHbTrans *hb = NULL;
while (true) {
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
hb = taosHashGet(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
bool exist = false;
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
SCH_ERR_RET(schAddHbConnection(pJob, pTask, pEpId, &exist));
if (!exist) {
SCH_RET(schBuildAndSendHbMsg(pEpId, NULL));
}
continue;
}
break;
}
atomic_add_fetch_64(&hb->taskNum, 1);
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
return TSDB_CODE_SUCCESS;
}
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
if (!pTask->registerdHb) {
return;
}
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
SEp* pEp = SCH_GET_CUR_EP(addr);
strcpy(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port;
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
SCH_TASK_WLOG("nodeId %d fqdn %s port %d not in hb connections", epId.nodeId, epId.ep.fqdn, epId.ep.port);
return;
}
int64_t taskNum = atomic_sub_fetch_64(&hb->taskNum, 1);
if (0 == taskNum) {
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
schRemoveHbConnection(pJob, pTask, &epId);
} else {
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
}
pTask->registerdHb = false;
}
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
SEp* pEp = SCH_GET_CUR_EP(addr);
strcpy(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port;
SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId));
pTask->registerdHb = true;
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
int32_t code = 0;
SSchHbTrans *hb = NULL;
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
SCH_LOCK(SCH_WRITE, &hb->lock);
memcpy(&hb->trans, trans, sizeof(*trans));
SCH_UNLOCK(SCH_WRITE, &hb->lock);
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId,
epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle);
return TSDB_CODE_SUCCESS;
}
void schCloseJobRef(void) {
if (!atomic_load_8((int8_t *)&schMgmt.exit)) {
return;
}
SCH_LOCK(SCH_WRITE, &schMgmt.lock);
if (atomic_load_32(&schMgmt.jobNum) <= 0 && schMgmt.jobRef >= 0) {
if (schMgmt.jobRef >= 0) {
taosCloseRef(schMgmt.jobRef);
schMgmt.jobRef = -1;
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.lock);
}
uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
@ -88,4 +260,3 @@ void schFreeRpcCtx(SRpcCtx *pCtx) {
(*pCtx->freeFunc)(pCtx->brokenVal.val);
}

View File

@ -182,6 +182,14 @@ int32_t scheduleCancelJob(int64_t job) {
SCH_RET(code);
}
void schedulerStopQueryHb(void *pTrans) {
if (NULL == pTrans) {
return;
}
schCleanClusterHb(pTrans);
}
void schedulerFreeJob(int64_t job) {
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {
@ -220,6 +228,7 @@ void schedulerDestroy(void) {
}
}
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
if (schMgmt.hbConnections) {
void *pIter = taosHashIterate(schMgmt.hbConnections, NULL);
while (pIter != NULL) {
@ -230,4 +239,5 @@ void schedulerDestroy(void) {
taosHashCleanup(schMgmt.hbConnections);
schMgmt.hbConnections = NULL;
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
}

View File

@ -180,6 +180,12 @@ static bool addHandleToAcceptloop(void* arg);
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
if (conn->regArg.init) { \
tTrace("server conn %p release, notify server app", conn); \
STrans* pTransInst = conn->pTransInst; \
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \
memset(&conn->regArg, 0, sizeof(conn->regArg)); \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
@ -1154,6 +1160,10 @@ int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
}
SExHandle* ex = thandle;
SSvrConn* pConn = ex->handle;
if (pConn == NULL) {
tTrace("invalid handle %p, failed to Get Conn info", thandle);
return -1;
}
struct sockaddr_in addr = pConn->addr;
pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);

View File

@ -225,14 +225,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_IN_DNODE, "Vgroup not in dnode")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_EXIST, "VGroup does not exist")
// mnode-stable
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "Stable already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_NOT_EXIST, "Stable not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC, "Stable confilct with topic")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "STable already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_NOT_EXIST, "STable not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC, "STable confilct with topic")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STBS, "Too many stables")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB, "Invalid stable name")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_OPTION, "Invalid stable options")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_ALTER_OPTION, "Invalid stable alter options")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_OPTION_UNCHNAGED, "Stable option unchanged")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_OPTION_UNCHNAGED, "STable option unchanged")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ROW_BYTES, "Invalid row bytes")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, "Too many tags")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_ALREADY_EXIST, "Tag already exists")
@ -240,6 +240,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_NOT_EXIST, "Tag does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_COLUMNS, "Too many columns")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_ALREADY_EXIST, "Column already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC,"Field used by topic")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SINGLE_STB_MODE_DB, "Database is single stable mode")
// mnode-infoSchema