Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/TD-29367-8

This commit is contained in:
Hongze Cheng 2024-07-23 17:48:24 +08:00
commit 9f4c993741
5 changed files with 328 additions and 250 deletions

View File

@ -12,173 +12,179 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndDef.h"
#include "mndConsumer.h"
#include "taoserror.h"
static void *freeStreamTasks(SArray *pTaskLevel);
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
TAOS_CHECK_RETURN(tStartEncode(pEncoder));
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->totalLevel) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->createTime));
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->updateTime));
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->version));
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->totalLevel));
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->smaId));
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->uid));
TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->status));
if (tEncodeI8(pEncoder, pObj->conf.igExpired) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->conf.trigger) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->conf.fillHistory) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->conf.triggerParam) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->conf.watermark) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.igExpired));
TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.trigger));
TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->conf.fillHistory));
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->conf.triggerParam));
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->conf.watermark));
if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->sourceDbUid));
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->targetDbUid));
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->sourceDb));
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->targetDb));
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->targetSTbName));
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->targetStbUid));
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->fixedSinkVgId));
if (pObj->sql != NULL) {
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->sql));
} else {
if (tEncodeCStr(pEncoder, "") < 0) return -1;
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
}
if (pObj->ast != NULL) {
if (tEncodeCStr(pEncoder, pObj->ast) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->ast));
} else {
if (tEncodeCStr(pEncoder, "") < 0) return -1;
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
}
if (pObj->physicalPlan != NULL) {
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->physicalPlan));
} else {
if (tEncodeCStr(pEncoder, "") < 0) return -1;
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
}
int32_t sz = taosArrayGetSize(pObj->tasks);
if (tEncodeI32(pEncoder, sz) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, sz));
for (int32_t i = 0; i < sz; i++) {
SArray *pArray = taosArrayGetP(pObj->tasks, i);
int32_t innerSz = taosArrayGetSize(pArray);
if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, innerSz));
for (int32_t j = 0; j < innerSz; j++) {
SStreamTask *pTask = taosArrayGetP(pArray, j);
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){
pTask->ver = SSTREAM_TASK_VER;
}
if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeStreamTask(pEncoder, pTask));
}
}
if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema));
// 3.0.20 ver =2
if (tEncodeI64(pEncoder, pObj->checkpointFreq) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->igCheckUpdate) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->checkpointFreq));
TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->igCheckUpdate));
// 3.0.50 ver = 3
if (tEncodeI64(pEncoder, pObj->checkpointId) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->subTableWithoutMd5) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->checkpointId));
TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->subTableWithoutMd5));
if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1));
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
int32_t code = 0;
TAOS_CHECK_RETURN(tStartDecode(pDecoder));
TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->totalLevel) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->createTime));
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->updateTime));
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->version));
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->totalLevel));
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->smaId));
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->uid));
TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->status));
if (tDecodeI8(pDecoder, &pObj->conf.igExpired) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->conf.trigger) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->conf.fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->conf.triggerParam) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->conf.watermark) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.igExpired));
TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.trigger));
TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->conf.fillHistory));
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->conf.triggerParam));
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->conf.watermark));
if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->sourceDbUid));
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->targetDbUid));
TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->sourceDb));
TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->targetDb));
TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->targetSTbName));
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->targetStbUid));
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->fixedSinkVgId));
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->ast) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->sql));
TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->ast));
TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan));
if (pObj->tasks != NULL) {
pObj->tasks = freeStreamTasks(pObj->tasks);
}
int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) {
return -1;
}
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &sz));
if (sz != 0) {
pObj->tasks = taosArrayInit(sz, sizeof(void *));
if (pObj->tasks == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TAOS_RETURN(code);
}
for (int32_t i = 0; i < sz; i++) {
int32_t innerSz;
if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &innerSz));
SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
if (pArray != NULL) {
for (int32_t j = 0; j < innerSz; j++) {
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
taosArrayDestroy(pArray);
return -1;
code = TSDB_CODE_OUT_OF_MEMORY;
TAOS_RETURN(code);
}
if (tDecodeStreamTask(pDecoder, pTask) < 0) {
if ((code = tDecodeStreamTask(pDecoder, pTask)) < 0) {
taosMemoryFree(pTask);
taosArrayDestroy(pArray);
return -1;
TAOS_RETURN(code);
}
taosArrayPush(pArray, &pTask);
}
}
taosArrayPush(pObj->tasks, &pArray);
}
}
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema));
// 3.0.20
if (sver >= 2) {
if (tDecodeI64(pDecoder, &pObj->checkpointFreq) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->checkpointFreq));
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI8(pDecoder, &pObj->igCheckUpdate) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->igCheckUpdate));
}
}
if (sver >= 3) {
if (tDecodeI64(pDecoder, &pObj->checkpointId) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->checkpointId));
}
if (sver >= 5) {
if (tDecodeI8(pDecoder, &pObj->subTableWithoutMd5) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->subTableWithoutMd5));
}
if (tDecodeCStrTo(pDecoder, pObj->reserve) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->reserve));
tEndDecode(pDecoder);
return 0;
TAOS_RETURN(code);
}
void *freeStreamTasks(SArray *pTaskLevel) {
@ -220,7 +226,10 @@ void tFreeStreamObj(SStreamObj *pStream) {
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
if (pVgEpNew == NULL) return NULL;
if (pVgEpNew == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pVgEpNew->vgId = pVgEp->vgId;
// pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg);
pVgEpNew->epSet = pVgEp->epSet;
@ -256,6 +265,7 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe) {
terrno = 0;
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
if (pConsumer == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -531,8 +541,10 @@ void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t s
}
SMqSubscribeObj *tNewSubscribeObj(const char *key) {
terrno = 0;
SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
if (pSubObj == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
@ -548,8 +560,12 @@ SMqSubscribeObj *tNewSubscribeObj(const char *key) {
}
SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
terrno = 0;
SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
if (pSubNew == NULL) return NULL;
if (pSubNew == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
taosInitRWLatch(&pSubNew->lock);

View File

@ -165,15 +165,24 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
}
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
if (pTrans == NULL) goto _OVER;
if (pTrans == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
mInfo("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
pRaw = mndDnodeActionEncode(&dnodeObj);
if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
if (pRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
code = 0;
mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD, 1);
@ -563,10 +572,7 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
SStatisReq statisReq = {0};
int32_t code = -1;
if (tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return code;
}
TAOS_CHECK_RETURN(tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq));
if (tsMonitorLogProtocol) {
mInfo("process statis req,\n %s", statisReq.pCont);
@ -586,23 +592,28 @@ static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
int32_t code = 0;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
if (pTrans == NULL) {
code = terrno;
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _exit;
}
pDnode->updateTime = taosGetTimestampMs();
SSdbRaw *pCommitRaw = mndDnodeActionEncode(pDnode);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
if (pCommitRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _exit;
}
if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
code = terrno;
goto _exit;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
code = terrno;
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
goto _exit;
}
@ -617,10 +628,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
SDnodeObj *pDnode = NULL;
int32_t code = -1;
if (tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER);
int64_t clusterid = mndGetClusterId(pMnode);
if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
@ -634,6 +642,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
if (pDnode == NULL) {
mInfo("dnode:%s, not created yet", statusReq.dnodeEp);
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
} else {
@ -871,16 +881,25 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
if (pTrans == NULL) goto _OVER;
if (pTrans == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
pRaw = mndDnodeActionEncode(&dnodeObj);
if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
if (pRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
code = 0;
mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD, 1);
@ -901,7 +920,7 @@ static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
rsp.dnodeList = taosArrayInit(5, sizeof(SEpSet));
if (NULL == rsp.dnodeList) {
mError("failed to alloc epSet while process dnode list req");
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
@ -922,7 +941,7 @@ static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &rsp);
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
@ -935,12 +954,12 @@ static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) {
_OVER:
if (code != 0) {
mError("failed to get dnode list since %s", terrstr());
mError("failed to get dnode list since %s", tstrerror(code));
}
tFreeSDnodeListRsp(&rsp);
return code;
TAOS_RETURN(code);
}
static void getSlowLogScopeString(int32_t scope, char* result){
@ -980,7 +999,7 @@ static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) {
rsp.variables = taosArrayInit(16, sizeof(SVariablesInfo));
if (NULL == rsp.variables) {
mError("failed to alloc SVariablesInfo array while process show variables req");
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
@ -1036,7 +1055,7 @@ static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) {
int32_t rspLen = tSerializeSShowVariablesRsp(NULL, 0, &rsp);
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
@ -1049,11 +1068,11 @@ static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) {
_OVER:
if (code != 0) {
mError("failed to get show variables info since %s", terrstr());
mError("failed to get show variables info since %s", tstrerror(code));
}
tFreeSShowVariablesRsp(&rsp);
return code;
TAOS_RETURN(code);
}
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
@ -1062,23 +1081,17 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
SDnodeObj *pDnode = NULL;
SCreateDnodeReq createReq = {0};
if ((terrno = grantCheck(TSDB_GRANT_DNODE)) != 0 || (terrno = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
code = terrno;
if ((code = grantCheck(TSDB_GRANT_DNODE)) != 0 || (code = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
goto _OVER;
}
if (tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
TAOS_CHECK_GOTO(tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
mInfo("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE) != 0) {
goto _OVER;
}
TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE), NULL, _OVER);
if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
terrno = TSDB_CODE_MND_INVALID_DNODE_EP;
code = TSDB_CODE_MND_INVALID_DNODE_EP;
goto _OVER;
}
@ -1086,7 +1099,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
pDnode = mndAcquireDnodeByEp(pMnode, ep);
if (pDnode != NULL) {
terrno = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
code = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
goto _OVER;
}
@ -1103,12 +1116,12 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, terrstr());
mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, tstrerror(code));
}
mndReleaseDnode(pMnode, pDnode);
tFreeSCreateDnodeReq(&createReq);
return code;
TAOS_RETURN(code);
}
extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq);
@ -1126,44 +1139,56 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SM
STrans *pTrans = NULL;
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
if (pTrans == NULL) goto _OVER;
if (pTrans == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
pRaw = mndDnodeActionEncode(pDnode);
if (pRaw == NULL) goto _OVER;
if (mndTransAppendRedolog(pTrans, pRaw) != 0) goto _OVER;
if (pRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRaw), NULL, _OVER);
(void)sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING);
pRaw = NULL;
pRaw = mndDnodeActionEncode(pDnode);
if (pRaw == NULL) goto _OVER;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
if (pRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), NULL, _OVER);
(void)sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
pRaw = NULL;
if (pMObj != NULL) {
mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force) != 0) goto _OVER;
TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force), NULL, _OVER);
}
if (pQObj != NULL) {
mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
if (mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force) != 0) goto _OVER;
TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force), NULL, _OVER);
}
if (pSObj != NULL) {
mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
if (mndSetDropSnodeInfoToTrans(pMnode, pTrans, pSObj, force) != 0) goto _OVER;
TAOS_CHECK_GOTO(mndSetDropSnodeInfoToTrans(pMnode, pTrans, pSObj, force), NULL, _OVER);
}
if (numOfVnodes > 0) {
mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
if (mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe) != 0) goto _OVER;
TAOS_CHECK_GOTO(mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force, unsafe), NULL, _OVER);
}
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, pDnode->fqdn, IP_WHITE_DROP, 1);
code = 0;
@ -1171,7 +1196,7 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SM
_OVER:
mndTransDrop(pTrans);
sdbFreeRaw(pRaw);
return code;
TAOS_RETURN(code);
}
static bool mndIsEmptyDnode(SMnode *pMnode, int32_t dnodeId) {
@ -1209,16 +1234,11 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
SSnodeObj *pSObj = NULL;
SDropDnodeReq dropReq = {0};
if (tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
TAOS_CHECK_GOTO(tDeserializeSDropDnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port,
dropReq.force ? "true" : "false", dropReq.unsafe ? "true" : "false");
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) {
goto _OVER;
}
TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER);
bool force = dropReq.force;
if (dropReq.unsafe) {
@ -1232,7 +1252,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
snprintf(ep, sizeof(ep), dropReq.fqdn, dropReq.port);
pDnode = mndAcquireDnodeByEp(pMnode, ep);
if (pDnode == NULL) {
terrno = err;
code = err;
goto _OVER;
}
}
@ -1242,11 +1262,11 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
if (pMObj != NULL) {
if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
terrno = TSDB_CODE_MND_TOO_FEW_MNODES;
code = TSDB_CODE_MND_TOO_FEW_MNODES;
goto _OVER;
}
if (pMnode->selfDnodeId == dropReq.dnodeId) {
terrno = TSDB_CODE_MND_CANT_DROP_LEADER;
code = TSDB_CODE_MND_CANT_DROP_LEADER;
goto _OVER;
}
}
@ -1255,16 +1275,16 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
bool isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
if (isonline && force) {
terrno = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(),
code = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
goto _OVER;
}
bool isEmpty = mndIsEmptyDnode(pMnode, pDnode->id);
if (!isonline && !force && !isEmpty) {
terrno = TSDB_CODE_DNODE_OFFLINE;
mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(),
code = TSDB_CODE_DNODE_OFFLINE;
mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, tstrerror(code),
numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
goto _OVER;
}
@ -1279,7 +1299,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, tstrerror(code));
}
mndReleaseDnode(pMnode, pDnode);
@ -1287,11 +1307,11 @@ _OVER:
mndReleaseQnode(pMnode, pQObj);
mndReleaseSnode(pMnode, pSObj);
tFreeSDropDnodeReq(&dropReq);
return code;
TAOS_RETURN(code);
}
static int32_t mndMCfg2DCfg(SMCfgDnodeReq *pMCfgReq, SDCfgDnodeReq *pDCfgReq) {
terrno = 0;
int32_t code = 0;
char *p = pMCfgReq->config;
while (*p) {
if (*p == ' ') {
@ -1314,12 +1334,12 @@ static int32_t mndMCfg2DCfg(SMCfgDnodeReq *pMCfgReq, SDCfgDnodeReq *pDCfgReq) {
strcpy(pDCfgReq->value, pMCfgReq->value);
}
return 0;
TAOS_RETURN(code);
_err:
mError("dnode:%d, failed to config since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config);
terrno = TSDB_CODE_INVALID_CFG;
return -1;
code = TSDB_CODE_INVALID_CFG;
TAOS_RETURN(code);
}
static int32_t mndSendCfgDnodeReq(SMnode *pMnode, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
@ -1349,23 +1369,21 @@ static int32_t mndSendCfgDnodeReq(SMnode *pMnode, int32_t dnodeId, SDCfgDnodeReq
}
if (code == -1) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
code = TSDB_CODE_MND_DNODE_NOT_EXIST;
}
return code;
TAOS_RETURN(code);
}
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
int32_t code = 0;
SMnode *pMnode = pReq->info.node;
SMCfgDnodeReq cfgReq = {0};
if (tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
int8_t updateIpWhiteList = 0;
mInfo("dnode:%d, start to config, option:%s, value:%s", cfgReq.dnodeId, cfgReq.config, cfgReq.value);
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE) != 0) {
if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
TAOS_RETURN(code);
}
SDCfgDnodeReq dcfgReq = {0};
@ -1381,26 +1399,26 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
if (flag > 1024 * 1024 || (flag > -1 && flag < 1024) || flag < -1) {
mError("dnode:%d, failed to config s3blocksize since value:%d. Valid range: -1 or [1024, 1024 * 1024]",
cfgReq.dnodeId, flag);
terrno = TSDB_CODE_INVALID_CFG;
code = TSDB_CODE_INVALID_CFG;
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
TAOS_RETURN(code);
}
strcpy(dcfgReq.config, "s3blocksize");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
#endif
} else {
if (mndMCfg2DCfg(&cfgReq, &dcfgReq)) goto _err_out;
TAOS_CHECK_GOTO (mndMCfg2DCfg(&cfgReq, &dcfgReq), NULL, _err_out);
if (strlen(dcfgReq.config) > TSDB_DNODE_CONFIG_LEN) {
mError("dnode:%d, failed to config since config is too long", cfgReq.dnodeId);
terrno = TSDB_CODE_INVALID_CFG;
code = TSDB_CODE_INVALID_CFG;
goto _err_out;
}
if (strncasecmp(dcfgReq.config, "enableWhiteList", strlen("enableWhiteList")) == 0) {
updateIpWhiteList = 1;
}
if (cfgCheckRangeForDynUpdate(taosGetCfg(), dcfgReq.config, dcfgReq.value, true) != 0) goto _err_out;
TAOS_CHECK_GOTO(cfgCheckRangeForDynUpdate(taosGetCfg(), dcfgReq.config, dcfgReq.value, true), NULL, _err_out);
}
{ // audit
@ -1412,15 +1430,15 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
tFreeSMCfgDnodeReq(&cfgReq);
int32_t code = mndSendCfgDnodeReq(pMnode, cfgReq.dnodeId, &dcfgReq);
code = mndSendCfgDnodeReq(pMnode, cfgReq.dnodeId, &dcfgReq);
// dont care suss or succ;
if (updateIpWhiteList) mndRefreshUserIpWhiteList(pMnode);
return code;
TAOS_RETURN(code);
_err_out:
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
TAOS_RETURN(code);
}
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
@ -1503,17 +1521,16 @@ _exit:
}
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
int32_t code = 0;
#ifdef TD_ENTERPRISE
SMnode *pMnode = pReq->info.node;
SMCfgDnodeReq cfgReq = {0};
if (tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
TAOS_CHECK_RETURN(tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq));
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE) != 0) {
if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE)) != 0) {
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
TAOS_RETURN(code);
}
const STraceId *trace = &pReq->info.traceId;
SDCfgDnodeReq dcfgReq = {0};
@ -1523,13 +1540,13 @@ static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pReq) {
tFreeSMCfgDnodeReq(&cfgReq);
return mndProcessCreateEncryptKeyReqImpl(pReq, cfgReq.dnodeId, &dcfgReq);
} else {
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
code = TSDB_CODE_PAR_INTERNAL_ERROR;
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
TAOS_RETURN(code);
}
#else
return 0;
TAOS_RETURN(code);
#endif
}
@ -1710,7 +1727,7 @@ static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
// get int32_t value from 'SMCfgDnodeReq'
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t optLen, int32_t *pOutValue) {
terrno = 0;
int32_t code = 0;
if (' ' != pMCfgReq->config[optLen] && 0 != pMCfgReq->config[optLen]) {
goto _err;
}
@ -1725,12 +1742,12 @@ static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t optLen, int32
*pOutValue = atoi(pMCfgReq->value);
}
return 0;
TAOS_RETURN(code);
_err:
mError("dnode:%d, failed to config since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config);
terrno = TSDB_CODE_INVALID_CFG;
return -1;
code = TSDB_CODE_INVALID_CFG;
TAOS_RETURN(code);
}
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {

View File

@ -29,12 +29,14 @@ void reportStartup(const char *name, const char *desc) {}
void sendRsp(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); }
int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
int32_t code = 0;
code = TSDB_CODE_INVALID_PTR;
TAOS_RETURN(code);
}
int32_t sendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
int32_t code = 0;
code = TSDB_CODE_INVALID_PTR;
TAOS_RETURN(code);
}
char *i642str(int64_t val) {

View File

@ -223,6 +223,7 @@ static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) {
}
static SFuncObj *mndAcquireFunc(SMnode *pMnode, char *funcName) {
terrno = 0;
SSdb *pSdb = pMnode->pSdb;
SFuncObj *pFunc = sdbAcquire(pSdb, SDB_FUNC, funcName);
if (pFunc == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
@ -240,7 +241,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre
int32_t code = -1;
STrans *pTrans = NULL;
if ((terrno = grantCheck(TSDB_GRANT_USER)) < 0) {
if ((code = grantCheck(TSDB_GRANT_USER)) < 0) {
return code;
}
@ -260,7 +261,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre
func.codeSize = pCreate->codeLen;
func.pCode = taosMemoryMalloc(func.codeSize);
if (func.pCode == NULL || func.pCode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
@ -270,7 +271,11 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre
memcpy(func.pCode, pCreate->pCode, func.codeSize);
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-func");
if (pTrans == NULL) goto _OVER;
if (pTrans == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
mInfo("trans:%d, used to create func:%s", pTrans->id, pCreate->name);
SFuncObj *oldFunc = mndAcquireFunc(pMnode, pCreate->name);
@ -279,31 +284,61 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre
func.createdTime = oldFunc->createdTime;
SSdbRaw *pRedoRaw = mndFuncActionEncode(oldFunc);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto _OVER;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) goto _OVER;
if (pRedoRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRedoRaw), NULL, _OVER);
TAOS_CHECK_GOTO(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY), NULL, _OVER);
SSdbRaw *pUndoRaw = mndFuncActionEncode(oldFunc);
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto _OVER;
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) goto _OVER;
if (pUndoRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendUndolog(pTrans, pUndoRaw), NULL, _OVER);
TAOS_CHECK_GOTO(sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY), NULL, _OVER);
SSdbRaw *pCommitRaw = mndFuncActionEncode(&func);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto _OVER;
if (pCommitRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pCommitRaw), NULL, _OVER);
TAOS_CHECK_GOTO(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY), NULL, _OVER);
} else {
SSdbRaw *pRedoRaw = mndFuncActionEncode(&func);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto _OVER;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) goto _OVER;
if (pRedoRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRedoRaw), NULL, _OVER);
TAOS_CHECK_GOTO(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING), NULL, _OVER);
SSdbRaw *pUndoRaw = mndFuncActionEncode(&func);
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto _OVER;
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) goto _OVER;
if (pUndoRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendUndolog(pTrans, pUndoRaw), NULL, _OVER);
TAOS_CHECK_GOTO(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED), NULL, _OVER);
SSdbRaw *pCommitRaw = mndFuncActionEncode(&func);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto _OVER;
if (pCommitRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pCommitRaw), NULL, _OVER);
TAOS_CHECK_GOTO(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY), NULL, _OVER);
}
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
code = 0;
@ -315,32 +350,48 @@ _OVER:
taosMemoryFree(func.pCode);
taosMemoryFree(func.pComment);
mndTransDrop(pTrans);
return code;
TAOS_RETURN(code);
}
static int32_t mndDropFunc(SMnode *pMnode, SRpcMsg *pReq, SFuncObj *pFunc) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "drop-func");
if (pTrans == NULL) goto _OVER;
if (pTrans == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
mInfo("trans:%d, used to drop user:%s", pTrans->id, pFunc->name);
SSdbRaw *pRedoRaw = mndFuncActionEncode(pFunc);
if (pRedoRaw == NULL) goto _OVER;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto _OVER;
if (pRedoRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRedoRaw), NULL, _OVER);
(void)sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING);
SSdbRaw *pUndoRaw = mndFuncActionEncode(pFunc);
if (pUndoRaw == NULL) goto _OVER;
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto _OVER;
if (pUndoRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendUndolog(pTrans, pUndoRaw), NULL, _OVER);
(void)sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY);
SSdbRaw *pCommitRaw = mndFuncActionEncode(pFunc);
if (pCommitRaw == NULL) goto _OVER;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER;
if (pCommitRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pCommitRaw), NULL, _OVER);
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
code = 0;
@ -355,18 +406,14 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {
SFuncObj *pFunc = NULL;
SCreateFuncReq createReq = {0};
if (tDeserializeSCreateFuncReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
TAOS_CHECK_GOTO(tDeserializeSCreateFuncReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
#ifdef WINDOWS
terrno = TSDB_CODE_MND_INVALID_PLATFORM;
code = TSDB_CODE_MND_INVALID_PLATFORM;
goto _OVER;
#endif
mInfo("func:%s, start to create, size:%d", createReq.name, createReq.codeLen);
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_FUNC) != 0) {
goto _OVER;
}
TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_FUNC), NULL, _OVER);
pFunc = mndAcquireFunc(pMnode, createReq.name);
if (pFunc != NULL) {
@ -378,7 +425,7 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {
mInfo("func:%s, replace function is set", createReq.name);
code = 0;
} else {
terrno = TSDB_CODE_MND_FUNC_ALREADY_EXIST;
code = TSDB_CODE_MND_FUNC_ALREADY_EXIST;
goto _OVER;
}
} else if (terrno == TSDB_CODE_MND_FUNC_ALREADY_EXIST) {
@ -386,22 +433,22 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {
}
if (createReq.name[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_FUNC_NAME;
code = TSDB_CODE_MND_INVALID_FUNC_NAME;
goto _OVER;
}
if (createReq.pCode == NULL) {
terrno = TSDB_CODE_MND_INVALID_FUNC_CODE;
code = TSDB_CODE_MND_INVALID_FUNC_CODE;
goto _OVER;
}
if (createReq.codeLen <= 1) {
terrno = TSDB_CODE_MND_INVALID_FUNC_CODE;
code = TSDB_CODE_MND_INVALID_FUNC_CODE;
goto _OVER;
}
if (createReq.bufSize < 0 || createReq.bufSize > TSDB_FUNC_BUF_SIZE) {
terrno = TSDB_CODE_MND_INVALID_FUNC_BUFSIZE;
code = TSDB_CODE_MND_INVALID_FUNC_BUFSIZE;
goto _OVER;
}
@ -410,12 +457,12 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("func:%s, failed to create since %s", createReq.name, terrstr());
mError("func:%s, failed to create since %s", createReq.name, tstrerror(code));
}
mndReleaseFunc(pMnode, pFunc);
tFreeSCreateFuncReq(&createReq);
return code;
TAOS_RETURN(code);
}
static int32_t mndProcessDropFuncReq(SRpcMsg *pReq) {
@ -424,18 +471,13 @@ static int32_t mndProcessDropFuncReq(SRpcMsg *pReq) {
SFuncObj *pFunc = NULL;
SDropFuncReq dropReq = {0};
if (tDeserializeSDropFuncReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
TAOS_CHECK_GOTO(tDeserializeSDropFuncReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
mInfo("func:%s, start to drop", dropReq.name);
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_FUNC) != 0) {
goto _OVER;
}
TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_FUNC), NULL, _OVER);
if (dropReq.name[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_FUNC_NAME;
code = TSDB_CODE_MND_INVALID_FUNC_NAME;
goto _OVER;
}
@ -446,7 +488,7 @@ static int32_t mndProcessDropFuncReq(SRpcMsg *pReq) {
code = 0;
goto _OVER;
} else {
terrno = TSDB_CODE_MND_FUNC_NOT_EXIST;
code = TSDB_CODE_MND_FUNC_NOT_EXIST;
goto _OVER;
}
}
@ -456,11 +498,11 @@ static int32_t mndProcessDropFuncReq(SRpcMsg *pReq) {
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("func:%s, failed to drop since %s", dropReq.name, terrstr());
mError("func:%s, failed to drop since %s", dropReq.name, tstrerror(code));
}
mndReleaseFunc(pMnode, pFunc);
return code;
TAOS_RETURN(code);
}
static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) {
@ -470,25 +512,25 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) {
SRetrieveFuncRsp retrieveRsp = {0};
if (tDeserializeSRetrieveFuncReq(pReq->pCont, pReq->contLen, &retrieveReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
code = TSDB_CODE_INVALID_MSG;
goto RETRIEVE_FUNC_OVER;
}
if (retrieveReq.numOfFuncs <= 0 || retrieveReq.numOfFuncs > TSDB_FUNC_MAX_RETRIEVE) {
terrno = TSDB_CODE_MND_INVALID_FUNC_RETRIEVE;
code = TSDB_CODE_MND_INVALID_FUNC_RETRIEVE;
goto RETRIEVE_FUNC_OVER;
}
retrieveRsp.numOfFuncs = retrieveReq.numOfFuncs;
retrieveRsp.pFuncInfos = taosArrayInit(retrieveReq.numOfFuncs, sizeof(SFuncInfo));
if (retrieveRsp.pFuncInfos == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto RETRIEVE_FUNC_OVER;
}
retrieveRsp.pFuncExtraInfos = taosArrayInit(retrieveReq.numOfFuncs, sizeof(SFuncExtraInfo));
if (retrieveRsp.pFuncExtraInfos == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto RETRIEVE_FUNC_OVER;
}
@ -497,6 +539,7 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) {
SFuncObj *pFunc = mndAcquireFunc(pMnode, funcName);
if (pFunc == NULL) {
if (terrno != 0) code = terrno;
goto RETRIEVE_FUNC_OVER;
}
@ -541,7 +584,7 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) {
int32_t contLen = tSerializeSRetrieveFuncRsp(NULL, 0, &retrieveRsp);
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto RETRIEVE_FUNC_OVER;
}
@ -556,7 +599,7 @@ RETRIEVE_FUNC_OVER:
tFreeSRetrieveFuncReq(&retrieveReq);
tFreeSRetrieveFuncRsp(&retrieveRsp);
return code;
TAOS_RETURN(code);
}
static void *mnodeGenTypeStr(char *buf, int32_t buflen, uint8_t type, int32_t len) {

View File

@ -549,7 +549,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
return 0;
}
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {