manage func obj

This commit is contained in:
Shengliang 2022-01-23 21:59:09 -08:00
parent cd377e4aa8
commit fde22bca36
5 changed files with 114 additions and 104 deletions

View File

@ -526,9 +526,9 @@ typedef struct {
typedef struct { typedef struct {
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN];
int8_t igExists;
int8_t funcType; int8_t funcType;
int8_t scriptType; int8_t scriptType;
int8_t align;
int8_t outputType; int8_t outputType;
int32_t outputLen; int32_t outputLen;
int32_t bufSize; int32_t bufSize;
@ -540,6 +540,7 @@ typedef struct {
typedef struct { typedef struct {
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN];
int8_t igNotExists;
} SDropFuncReq; } SDropFuncReq;
typedef struct { typedef struct {
@ -549,9 +550,9 @@ typedef struct {
typedef struct { typedef struct {
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN];
int8_t align;
int8_t funcType; int8_t funcType;
int8_t scriptType; int8_t scriptType;
int8_t align;
int8_t outputType; int8_t outputType;
int32_t outputLen; int32_t outputLen;
int32_t bufSize; int32_t bufSize;

View File

@ -289,7 +289,6 @@ typedef struct {
int32_t codeSize; int32_t codeSize;
char *pComment; char *pComment;
char *pCode; char *pCode;
char pData[];
} SFuncObj; } SFuncObj;
typedef struct { typedef struct {

View File

@ -25,7 +25,7 @@ static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc);
static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw); static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw);
static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc); static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc);
static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc); static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc);
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNewFunc); static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew);
static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate); static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate);
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc); static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc);
static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq); static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq);
@ -104,13 +104,11 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
goto FUNC_DECODE_OVER; goto FUNC_DECODE_OVER;
} }
int32_t size = sizeof(SFuncObj) + TSDB_FUNC_COMMENT_LEN + TSDB_FUNC_CODE_LEN; SSdbRow *pRow = sdbAllocRow(sizeof(SFuncObj));
SSdbRow *pRow = sdbAllocRow(size);
if (pRow == NULL) goto FUNC_DECODE_OVER; if (pRow == NULL) goto FUNC_DECODE_OVER;
SFuncObj *pFunc = sdbGetRowObj(pRow); SFuncObj *pFunc = sdbGetRowObj(pRow);
if (pFunc == NULL) goto FUNC_DECODE_OVER; if (pFunc == NULL) goto FUNC_DECODE_OVER;
char *tmp = (char *)pFunc + sizeof(SFuncObj);
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, FUNC_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, FUNC_DECODE_OVER)
@ -124,9 +122,15 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pFunc->signature, FUNC_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pFunc->signature, FUNC_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pFunc->commentSize, FUNC_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pFunc->commentSize, FUNC_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pFunc->codeSize, FUNC_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pFunc->codeSize, FUNC_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pFunc->pData, pFunc->commentSize + pFunc->codeSize, FUNC_DECODE_OVER)
pFunc->pComment = pFunc->pData; pFunc->pComment = calloc(1, pFunc->commentSize);
pFunc->pCode = (pFunc->pData + pFunc->commentSize); pFunc->pCode = calloc(1, pFunc->codeSize);
if (pFunc->pComment == NULL || pFunc->pCode == NULL) {
goto FUNC_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, FUNC_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, FUNC_DECODE_OVER)
terrno = 0; terrno = 0;
@ -151,114 +155,104 @@ static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) {
return 0; return 0;
} }
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNewFunc) { static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) {
mTrace("func:%s, perform update action, old row:%p new row:%p", pOldFunc->name, pOldFunc, pNewFunc); mTrace("func:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
return 0; return 0;
} }
static SFuncObj *mndAcquireFunc(SMnode *pMnode, char *funcName) {
SSdb *pSdb = pMnode->pSdb;
SFuncObj *pFunc = sdbAcquire(pSdb, SDB_FUNC, funcName);
if (pFunc == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
terrno = TSDB_CODE_MND_FUNC_NOT_EXIST;
}
return pFunc;
}
static void mndReleaseFunc(SMnode *pMnode, SFuncObj *pFunc) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pFunc);
}
static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate) { static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate) {
SFuncObj *pFunc = calloc(1, sizeof(SFuncObj) + pCreate->commentSize + pCreate->codeSize); int32_t code = -1;
pFunc->createdTime = taosGetTimestampMs(); STrans *pTrans = NULL;
pFunc->funcType = pCreate->funcType;
pFunc->scriptType = pCreate->scriptType;
pFunc->outputType = pCreate->outputType;
pFunc->outputLen = pCreate->outputLen;
pFunc->bufSize = pCreate->bufSize;
pFunc->signature = pCreate->signature;
pFunc->commentSize = pCreate->commentSize;
pFunc->codeSize = pCreate->codeSize;
pFunc->pComment = pFunc->pData;
memcpy(pFunc->pComment, pCreate->pCont, pCreate->commentSize);
pFunc->pCode = pFunc->pData + pCreate->commentSize;
memcpy(pFunc->pCode, pCreate->pCont + pCreate->commentSize, pFunc->codeSize);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); SFuncObj func = {0};
if (pTrans == NULL) { memcpy(func.name, pCreate->name, TSDB_FUNC_NAME_LEN);
free(pFunc); func.createdTime = taosGetTimestampMs();
mError("func:%s, failed to create since %s", pCreate->name, terrstr()); func.funcType = pCreate->funcType;
return -1; func.scriptType = pCreate->scriptType;
func.outputType = pCreate->outputType;
func.outputLen = pCreate->outputLen;
func.bufSize = pCreate->bufSize;
func.signature = pCreate->signature;
func.commentSize = pCreate->commentSize;
func.codeSize = pCreate->codeSize;
func.pComment = malloc(func.commentSize);
func.pCode = malloc(func.codeSize);
if (func.pCode == NULL || func.pCode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto CREATE_FUNC_OVER;
} }
memcpy(func.pComment, pCreate->pCont, pCreate->commentSize);
memcpy(func.pCode, pCreate->pCont + pCreate->commentSize, func.codeSize);
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
if (pTrans == NULL) goto CREATE_FUNC_OVER;
mDebug("trans:%d, used to create func:%s", pTrans->id, pCreate->name); mDebug("trans:%d, used to create func:%s", pTrans->id, pCreate->name);
SSdbRaw *pRedoRaw = mndFuncActionEncode(pFunc); SSdbRaw *pRedoRaw = mndFuncActionEncode(&func);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto CREATE_FUNC_OVER;
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) goto CREATE_FUNC_OVER;
free(pFunc);
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING);
SSdbRaw *pUndoRaw = mndFuncActionEncode(pFunc); SSdbRaw *pUndoRaw = mndFuncActionEncode(&func);
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto CREATE_FUNC_OVER;
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) goto CREATE_FUNC_OVER;
free(pFunc);
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
SSdbRaw *pCommitRaw = mndFuncActionEncode(pFunc); SSdbRaw *pCommitRaw = mndFuncActionEncode(&func);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto CREATE_FUNC_OVER;
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto CREATE_FUNC_OVER;
free(pFunc);
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_FUNC_OVER;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
free(pFunc); code = 0;
CREATE_FUNC_OVER:
free(func.pCode);
free(func.pComment);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return code;
} }
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc) { static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) goto DROP_FUNC_OVER;
mError("func:%s, failed to drop since %s", pFunc->name, terrstr());
return -1;
}
mDebug("trans:%d, used to drop user:%s", pTrans->id, pFunc->name); mDebug("trans:%d, used to drop user:%s", pTrans->id, pFunc->name);
SSdbRaw *pRedoRaw = mndFuncActionEncode(pFunc); SSdbRaw *pRedoRaw = mndFuncActionEncode(pFunc);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto DROP_FUNC_OVER;
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING); sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING);
SSdbRaw *pUndoRaw = mndFuncActionEncode(pFunc); SSdbRaw *pUndoRaw = mndFuncActionEncode(pFunc);
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto DROP_FUNC_OVER;
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY); sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY);
SSdbRaw *pCommitRaw = mndFuncActionEncode(pFunc); SSdbRaw *pCommitRaw = mndFuncActionEncode(pFunc);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto DROP_FUNC_OVER;
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_FUNC_OVER;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
code = 0;
DROP_FUNC_OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return code;
} }
static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq) { static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq) {
@ -273,11 +267,19 @@ static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq) {
mDebug("func:%s, start to create", pCreate->name); mDebug("func:%s, start to create", pCreate->name);
SFuncObj *pFunc = sdbAcquire(pMnode->pSdb, SDB_FUNC, pCreate->name); SFuncObj *pFunc = mndAcquireFunc(pMnode->pSdb, pCreate->name);
if (pFunc != NULL) { if (pFunc != NULL) {
sdbRelease(pMnode->pSdb, pFunc); mndReleaseFunc(pMnode->pSdb, pFunc);
terrno = TSDB_CODE_MND_FUNC_ALREADY_EXIST; if (pCreate->igExists) {
mError("func:%s, failed to create since %s", pCreate->name, terrstr()); mDebug("stb:%s, already exist, ignore exist is set", pCreate->name);
return 0;
} else {
terrno = TSDB_CODE_MND_FUNC_ALREADY_EXIST;
mError("func:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
} else if (terrno != TSDB_CODE_MND_FUNC_ALREADY_EXIST) {
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
return -1; return -1;
} }
@ -312,7 +314,6 @@ static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq) {
} }
int32_t code = mndCreateFunc(pMnode, pReq, pCreate); int32_t code = mndCreateFunc(pMnode, pReq, pCreate);
if (code != 0) { if (code != 0) {
mError("func:%s, failed to create since %s", pCreate->name, terrstr()); mError("func:%s, failed to create since %s", pCreate->name, terrstr());
return -1; return -1;
@ -333,15 +334,19 @@ static int32_t mndProcessDropFuncReq(SMnodeMsg *pReq) {
return -1; return -1;
} }
SFuncObj *pFunc = sdbAcquire(pMnode->pSdb, SDB_FUNC, pDrop->name); SFuncObj *pFunc = mndAcquireFunc(pMnode->pSdb, pDrop->name);
if (pFunc == NULL) { if (pFunc == NULL) {
terrno = TSDB_CODE_MND_FUNC_NOT_EXIST; if (pDrop->igNotExists) {
mError("func:%s, failed to drop since %s", pDrop->name, terrstr()); mDebug("func:%s, not exist, ignore not exist is set", pDrop->name);
return -1; return 0;
} else {
terrno = TSDB_CODE_MND_FUNC_NOT_EXIST;
mError("func:%s, failed to drop since %s", pDrop->name, terrstr());
return -1;
}
} }
int32_t code = mndDropFunc(pMnode, pReq, pFunc); int32_t code = mndDropFunc(pMnode, pReq, pFunc);
if (code != 0) { if (code != 0) {
mError("func:%s, failed to drop since %s", pDrop->name, terrstr()); mError("func:%s, failed to drop since %s", pDrop->name, terrstr());
return -1; return -1;

View File

@ -123,8 +123,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
goto STB_DECODE_OVER; goto STB_DECODE_OVER;
} }
int32_t size = sizeof(SStbObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); SSdbRow *pRow = sdbAllocRow(sizeof(SStbObj));
SSdbRow *pRow = sdbAllocRow(size);
if (pRow == NULL) goto STB_DECODE_OVER; if (pRow == NULL) goto STB_DECODE_OVER;
SStbObj *pStb = sdbGetRowObj(pRow); SStbObj *pStb = sdbGetRowObj(pRow);
@ -143,6 +142,9 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags; int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
pStb->pSchema = calloc(totalCols, sizeof(SSchema)); pStb->pSchema = calloc(totalCols, sizeof(SSchema));
if (pStb->pSchema == NULL) {
goto STB_DECODE_OVER;
}
for (int32_t i = 0; i < totalCols; ++i) { for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pStb->pSchema[i]; SSchema *pSchema = &pStb->pSchema[i];
@ -448,7 +450,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr
stbObj.pSchema[i].colId = i + 1; stbObj.pSchema[i].colId = i + 1;
} }
int32_t code = 0; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
if (pTrans == NULL) goto CREATE_STB_OVER; if (pTrans == NULL) goto CREATE_STB_OVER;
@ -481,7 +483,7 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->name); SStbObj *pStb = mndAcquireStb(pMnode, pCreate->name);
if (pStb != NULL) { if (pStb != NULL) {
sdbRelease(pMnode->pSdb, pStb); mndReleaseStb(pMnode->pSdb, pStb);
if (pCreate->igExists) { if (pCreate->igExists) {
mDebug("stb:%s, already exist, ignore exist is set", pCreate->name); mDebug("stb:%s, already exist, ignore exist is set", pCreate->name);
return 0; return 0;
@ -492,6 +494,7 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
} }
} else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) { } else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
} }
// topic should have different name with stb // topic should have different name with stb

View File

@ -390,9 +390,11 @@ static void mndTransDropActions(SArray *pArray) {
} }
void mndTransDrop(STrans *pTrans) { void mndTransDrop(STrans *pTrans) {
mndTransDropData(pTrans); if (pTrans != NULL) {
mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans); mndTransDropData(pTrans);
tfree(pTrans); mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans);
tfree(pTrans);
}
} }
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) { static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) {