|
|
|
@ -26,13 +26,13 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw);
|
|
|
|
|
static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc);
|
|
|
|
|
static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc);
|
|
|
|
|
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNewFunc);
|
|
|
|
|
static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncReq *pCreate);
|
|
|
|
|
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc);
|
|
|
|
|
static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg);
|
|
|
|
|
static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg);
|
|
|
|
|
static int32_t mndProcessRetrieveFuncMsg(SMnodeMsg *pMsg);
|
|
|
|
|
static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
|
|
|
|
|
static int32_t mndRetrieveFuncs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
|
|
|
|
static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate);
|
|
|
|
|
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc);
|
|
|
|
|
static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq);
|
|
|
|
|
static int32_t mndProcessDropFuncReq(SMnodeMsg *pReq);
|
|
|
|
|
static int32_t mndProcessRetrieveFuncReq(SMnodeMsg *pReq);
|
|
|
|
|
static int32_t mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
|
|
|
|
static int32_t mndRetrieveFuncs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
|
|
|
|
static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter);
|
|
|
|
|
|
|
|
|
|
int32_t mndInitFunc(SMnode *pMnode) {
|
|
|
|
@ -44,13 +44,13 @@ int32_t mndInitFunc(SMnode *pMnode) {
|
|
|
|
|
.updateFp = (SdbUpdateFp)mndFuncActionUpdate,
|
|
|
|
|
.deleteFp = (SdbDeleteFp)mndFuncActionDelete};
|
|
|
|
|
|
|
|
|
|
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_FUNCTION, mndProcessCreateFuncMsg);
|
|
|
|
|
mndSetMsgHandle(pMnode, TDMT_MND_DROP_FUNCTION, mndProcessDropFuncMsg);
|
|
|
|
|
mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_FUNCTION, mndProcessRetrieveFuncMsg);
|
|
|
|
|
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_FUNC, mndProcessCreateFuncReq);
|
|
|
|
|
mndSetMsgHandle(pMnode, TDMT_MND_DROP_FUNC, mndProcessDropFuncReq);
|
|
|
|
|
mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_FUNC, mndProcessRetrieveFuncReq);
|
|
|
|
|
|
|
|
|
|
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_FUNCTION, mndGetFuncMeta);
|
|
|
|
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_FUNCTION, mndRetrieveFuncs);
|
|
|
|
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_FUNCTION, mndCancelGetNextFunc);
|
|
|
|
|
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndGetFuncMeta);
|
|
|
|
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndRetrieveFuncs);
|
|
|
|
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndCancelGetNextFunc);
|
|
|
|
|
|
|
|
|
|
return sdbSetTable(pMnode->pSdb, table);
|
|
|
|
|
}
|
|
|
|
@ -156,7 +156,7 @@ static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNe
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncReq *pCreate) {
|
|
|
|
|
static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate) {
|
|
|
|
|
SFuncObj *pFunc = calloc(1, sizeof(SFuncObj) + pCreate->commentSize + pCreate->codeSize);
|
|
|
|
|
pFunc->createdTime = taosGetTimestampMs();
|
|
|
|
|
pFunc->funcType = pCreate->funcType;
|
|
|
|
@ -172,7 +172,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncReq *pC
|
|
|
|
|
pFunc->pCode = pFunc->pData + pCreate->commentSize;
|
|
|
|
|
memcpy(pFunc->pCode, pCreate->pCont + pCreate->commentSize, pFunc->codeSize);
|
|
|
|
|
|
|
|
|
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
|
|
|
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
|
|
|
|
if (pTrans == NULL) {
|
|
|
|
|
free(pFunc);
|
|
|
|
|
mError("func:%s, failed to create since %s", pCreate->name, terrstr());
|
|
|
|
@ -219,8 +219,8 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncReq *pC
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc) {
|
|
|
|
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
|
|
|
|
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc) {
|
|
|
|
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
|
|
|
|
if (pTrans == NULL) {
|
|
|
|
|
mError("func:%s, failed to drop since %s", pFunc->name, terrstr());
|
|
|
|
|
return -1;
|
|
|
|
@ -261,10 +261,10 @@ static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg) {
|
|
|
|
|
SMnode *pMnode = pMsg->pMnode;
|
|
|
|
|
static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq) {
|
|
|
|
|
SMnode *pMnode = pReq->pMnode;
|
|
|
|
|
|
|
|
|
|
SCreateFuncReq *pCreate = pMsg->rpcMsg.pCont;
|
|
|
|
|
SCreateFuncReq *pCreate = pReq->rpcMsg.pCont;
|
|
|
|
|
pCreate->outputLen = htonl(pCreate->outputLen);
|
|
|
|
|
pCreate->bufSize = htonl(pCreate->bufSize);
|
|
|
|
|
pCreate->sigature = htobe64(pCreate->sigature);
|
|
|
|
@ -311,7 +311,7 @@ static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t code = mndCreateFunc(pMnode, pMsg, pCreate);
|
|
|
|
|
int32_t code = mndCreateFunc(pMnode, pReq, pCreate);
|
|
|
|
|
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
mError("func:%s, failed to create since %s", pCreate->name, terrstr());
|
|
|
|
@ -321,9 +321,9 @@ static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg) {
|
|
|
|
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg) {
|
|
|
|
|
SMnode *pMnode = pMsg->pMnode;
|
|
|
|
|
SDropFuncReq *pDrop = pMsg->rpcMsg.pCont;
|
|
|
|
|
static int32_t mndProcessDropFuncReq(SMnodeMsg *pReq) {
|
|
|
|
|
SMnode *pMnode = pReq->pMnode;
|
|
|
|
|
SDropFuncReq *pDrop = pReq->rpcMsg.pCont;
|
|
|
|
|
|
|
|
|
|
mDebug("func:%s, start to drop", pDrop->name);
|
|
|
|
|
|
|
|
|
@ -340,7 +340,7 @@ static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t code = mndDropFunc(pMnode, pMsg, pFunc);
|
|
|
|
|
int32_t code = mndDropFunc(pMnode, pReq, pFunc);
|
|
|
|
|
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
mError("func:%s, failed to drop since %s", pDrop->name, terrstr());
|
|
|
|
@ -350,10 +350,10 @@ static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg) {
|
|
|
|
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t mndProcessRetrieveFuncMsg(SMnodeMsg *pMsg) {
|
|
|
|
|
SMnode *pMnode = pMsg->pMnode;
|
|
|
|
|
static int32_t mndProcessRetrieveFuncReq(SMnodeMsg *pReq) {
|
|
|
|
|
SMnode *pMnode = pReq->pMnode;
|
|
|
|
|
|
|
|
|
|
SRetrieveFuncReq *pRetrieve = pMsg->rpcMsg.pCont;
|
|
|
|
|
SRetrieveFuncReq *pRetrieve = pReq->rpcMsg.pCont;
|
|
|
|
|
pRetrieve->numOfFuncs = htonl(pRetrieve->numOfFuncs);
|
|
|
|
|
|
|
|
|
|
int32_t size = sizeof(SRetrieveFuncRsp) + (sizeof(SFuncInfo) + TSDB_FUNC_CODE_LEN) * pRetrieve->numOfFuncs + 16384;
|
|
|
|
@ -389,14 +389,14 @@ static int32_t mndProcessRetrieveFuncMsg(SMnodeMsg *pMsg) {
|
|
|
|
|
pOutput += sizeof(SFuncInfo) + pFunc->commentSize + pFunc->codeSize;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pMsg->pCont = pRetrieveRsp;
|
|
|
|
|
pMsg->contLen = (int32_t)(pOutput - (char *)pRetrieveRsp);
|
|
|
|
|
pReq->pCont = pRetrieveRsp;
|
|
|
|
|
pReq->contLen = (int32_t)(pOutput - (char *)pRetrieveRsp);
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
|
|
|
|
|
SMnode *pMnode = pMsg->pMnode;
|
|
|
|
|
static int32_t mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
|
|
|
|
|
SMnode *pMnode = pReq->pMnode;
|
|
|
|
|
SSdb *pSdb = pMnode->pSdb;
|
|
|
|
|
|
|
|
|
|
int32_t cols = 0;
|
|
|
|
@ -477,8 +477,8 @@ static void *mnodeGenTypeStr(char *buf, int32_t buflen, uint8_t type, int16_t le
|
|
|
|
|
return tDataTypes[type].name;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t mndRetrieveFuncs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
|
|
|
|
|
SMnode *pMnode = pMsg->pMnode;
|
|
|
|
|
static int32_t mndRetrieveFuncs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
|
|
|
|
SMnode *pMnode = pReq->pMnode;
|
|
|
|
|
SSdb *pSdb = pMnode->pSdb;
|
|
|
|
|
int32_t numOfRows = 0;
|
|
|
|
|
SFuncObj *pFunc = NULL;
|
|
|
|
|