diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8dd95cffab..1e614ee536 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3905,6 +3905,7 @@ typedef struct { typedef struct { char name[TSDB_VIEW_NAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN]; + char* sql; int8_t orReplace; int8_t precision; int32_t numOfCols; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index c819247f4d..55e0000478 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -383,15 +383,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_TOO_MANY_STREAMS TAOS_DEF_ERROR_CODE(0, 0x03F6) #define TSDB_CODE_MND_INVALID_TARGET_TABLE TAOS_DEF_ERROR_CODE(0, 0x03F7) -// mnode-sma -#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) -#define TSDB_CODE_MND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0481) -#define TSDB_CODE_MND_INVALID_SMA_OPTION TAOS_DEF_ERROR_CODE(0, 0x0482) -// mnode-tag-indxe - -#define TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0483) -#define TSDB_CODE_MND_TAG_INDEX_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0484) // dnode // #define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) // 2.x @@ -418,6 +410,22 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MNODE_NO_NEED_RESTORE TAOS_DEF_ERROR_CODE(0, 0x0415) // internal #define TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0416) +// mnode-sma +#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) +#define TSDB_CODE_MND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0481) +#define TSDB_CODE_MND_INVALID_SMA_OPTION TAOS_DEF_ERROR_CODE(0, 0x0482) + +// mnode-tag-indxe + +#define TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0483) +#define TSDB_CODE_MND_TAG_INDEX_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0484) + + +// mnode-view +#define TSDB_CODE_MND_VIEW_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x04A0) +#define TSDB_CODE_MND_VIEW_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x04A1) + + // vnode // #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) // 2.x // #define TSDB_CODE_VND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0501) // 2.x diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index ed1bbafe42..2f3fec0f60 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -729,55 +729,7 @@ void tFreeStreamObj(SStreamObj* pObj); // } SStreamCheckpointObj; -typedef struct { - char name[TSDB_STREAM_FNAME_LEN]; - // ctl - SRWLatch lock; - // create info - int64_t createTime; - int64_t updateTime; - int32_t version; - int32_t totalLevel; - int64_t smaId; // 0 for unused - // info - int64_t uid; - int8_t status; - SStreamConf conf; - // source and target - int64_t sourceDbUid; - int64_t targetDbUid; - char sourceDb[TSDB_DB_FNAME_LEN]; - char targetDb[TSDB_DB_FNAME_LEN]; - char targetSTbName[TSDB_TABLE_FNAME_LEN]; - int64_t targetStbUid; - - // fixedSinkVg is not applicable for encode and decode - SVgObj fixedSinkVg; - int32_t fixedSinkVgId; // 0 for shuffle - - // transformation - char* sql; - char* ast; - char* physicalPlan; - SArray* tasks; // SArray> - - SArray* pHTasksList; // generate the results for already stored ts data - int64_t hTaskUid; // stream task for history ts data - - SSchemaWrapper outputSchema; - SSchemaWrapper tagSchema; - - // 3.0.20 - int64_t checkpointFreq; // ms - int64_t currentTick; // do not serialize - int64_t deleteMark; - int8_t igCheckUpdate; - - // 3.0.5. - int64_t checkpointId; - char reserve[256]; - -} SViewObj; +typedef SCMCreateViewReq SViewObj; int32_t tEncodeSViewObj(SEncoder* pEncoder, const SViewObj* pObj); int32_t tDecodeSViewObj(SDecoder* pDecoder, SViewObj* pObj, int32_t sver); diff --git a/source/dnode/mnode/impl/inc/mndView.h b/source/dnode/mnode/impl/inc/mndView.h index 19fd2a3fd4..7c6e102207 100755 --- a/source/dnode/mnode/impl/inc/mndView.h +++ b/source/dnode/mnode/impl/inc/mndView.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_MND_STREAM_H_ -#define _TD_MND_STREAM_H_ +#ifndef _TD_MND_VIEW_H_ +#define _TD_MND_VIEW_H_ #include "mndInt.h" @@ -22,27 +22,17 @@ extern "C" { #endif -int32_t mndInitStream(SMnode *pMnode); -void mndCleanupStream(SMnode *pMnode); +int32_t mndInitView(SMnode *pMnode); +void mndCleanupView(SMnode *pMnode); -SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName); -void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); +SViewObj *mndAcquireView(SMnode *pMnode, char *viewName); +void mndReleaseView(SMnode *pMnode, SViewObj *pView); -SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); -SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); - -int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); -int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); - -// for sma -// TODO refactor -int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); - -int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams); +SSdbRaw *mndViewActionEncode(SViewObj *pView); +SSdbRow *mndViewActionDecode(SSdbRaw *pRaw); #ifdef __cplusplus } #endif -#endif /*_TD_MND_STREAM_H_*/ +#endif /*_TD_MND_VIEW_H_*/ diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 5d150b731c..ff3a0c6a58 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -718,7 +718,7 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname) { if (dbname != NULL) { - tstrncpy(pTrans->dbname, dbname, TSDB_TABLE_FNAME_LEN); + tstrncpy(pTrans->dbname, dbname, TSDB_DB_FNAME_LEN); } if (stbname != NULL) { tstrncpy(pTrans->stbname, stbname, TSDB_TABLE_FNAME_LEN); diff --git a/source/dnode/mnode/impl/src/mndView.c b/source/dnode/mnode/impl/src/mndView.c index 1e58b9f3a9..6182b0dbae 100755 --- a/source/dnode/mnode/impl/src/mndView.c +++ b/source/dnode/mnode/impl/src/mndView.c @@ -16,6 +16,14 @@ #include "mndView.h" int32_t mndInitView(SMnode *pMnode) { + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_VIEW, mndProcessCreateViewReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_VIEW, mndProcessDropViewReq); + mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck); + + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndRetrieveView); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndCancelGetNextView); + +#ifdef TD_ENTERPRISE SSdbTable table = { .sdbType = SDB_VIEW, .keyType = SDB_KEY_BINARY, @@ -26,18 +34,10 @@ int32_t mndInitView(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndViewActionDelete, }; - mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); - mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq); - mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck); - - mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndRetrieveStream); - mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndCancelGetNextStream); - - taosThreadMutexInit(&execNodeList.lock, NULL); - execNodeList.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); - execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskStatusEntry)); - return sdbSetTable(pMnode->pSdb, table); +#else + return TSDB_CODE_SUCCESS; +#endif } void mndCleanupView(SMnode *pMnode) { @@ -47,499 +47,12 @@ void mndCleanupView(SMnode *pMnode) { mDebug("mnd view cleanup"); } -int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SViewObj *pObj) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; - - if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1; - if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; - if (tEncodeI32(pEncoder, pObj->totalLevel) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1; - - if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1; - if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; - - if (tEncodeI8(pEncoder, pObj->conf.igExpired) < 0) return -1; - if (tEncodeI8(pEncoder, pObj->conf.trigger) < 0) return -1; - if (tEncodeI8(pEncoder, pObj->conf.fillHistory) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->conf.triggerParam) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->conf.watermark) < 0) return -1; - - if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1; - if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1; - if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1; - if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1; - if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1; - - if (pObj->sql != NULL) { - if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; - } else { - if (tEncodeCStr(pEncoder, "") < 0) return -1; - } - - if (pObj->ast != NULL) { - if (tEncodeCStr(pEncoder, pObj->ast) < 0) return -1; - } else { - if (tEncodeCStr(pEncoder, "") < 0) return -1; - } - - if (pObj->physicalPlan != NULL) { - if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1; - } else { - if (tEncodeCStr(pEncoder, "") < 0) return -1; - } - - int32_t sz = taosArrayGetSize(pObj->tasks); - if (tEncodeI32(pEncoder, sz) < 0) return -1; - for (int32_t i = 0; i < sz; i++) { - SArray *pArray = taosArrayGetP(pObj->tasks, i); - int32_t innerSz = taosArrayGetSize(pArray); - if (tEncodeI32(pEncoder, innerSz) < 0) return -1; - for (int32_t j = 0; j < innerSz; j++) { - SStreamTask *pTask = taosArrayGetP(pArray, j); - pTask->ver = SSTREAM_TASK_VER; - if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1; - } - } - - if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1; - - // 3.0.20 ver =2 - if (tEncodeI64(pEncoder, pObj->checkpointFreq) < 0) return -1; - if (tEncodeI8(pEncoder, pObj->igCheckUpdate) < 0) return -1; - - // 3.0.50 ver = 3 - if (tEncodeI64(pEncoder, pObj->checkpointId) < 0) return -1; - - if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1) < 0) return -1; - - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeSStreamObj(SDecoder *pDecoder, SViewObj *pObj, int32_t sver) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1; - - if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1; - if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; - if (tDecodeI32(pDecoder, &pObj->totalLevel) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1; - - if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1; - if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; - - if (tDecodeI8(pDecoder, &pObj->conf.igExpired) < 0) return -1; - if (tDecodeI8(pDecoder, &pObj->conf.trigger) < 0) return -1; - if (tDecodeI8(pDecoder, &pObj->conf.fillHistory) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->conf.triggerParam) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->conf.watermark) < 0) return -1; - - if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1; - if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1; - - if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; - if (tDecodeCStrAlloc(pDecoder, &pObj->ast) < 0) return -1; - if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1; - - pObj->tasks = NULL; - int32_t sz; - if (tDecodeI32(pDecoder, &sz) < 0) return -1; - if (sz != 0) { - pObj->tasks = taosArrayInit(sz, sizeof(void *)); - for (int32_t i = 0; i < sz; i++) { - int32_t innerSz; - if (tDecodeI32(pDecoder, &innerSz) < 0) return -1; - SArray *pArray = taosArrayInit(innerSz, sizeof(void *)); - for (int32_t j = 0; j < innerSz; j++) { - SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - taosArrayDestroy(pArray); - return -1; - } - if (tDecodeStreamTask(pDecoder, pTask) < 0) { - taosMemoryFree(pTask); - taosArrayDestroy(pArray); - return -1; - } - taosArrayPush(pArray, &pTask); - } - taosArrayPush(pObj->tasks, &pArray); - } - } - - if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1; - - // 3.0.20 - if (sver >= 2) { - if (tDecodeI64(pDecoder, &pObj->checkpointFreq) < 0) return -1; - if (!tDecodeIsEnd(pDecoder)) { - if (tDecodeI8(pDecoder, &pObj->igCheckUpdate) < 0) return -1; - } - } - if (sver >= 3) { - if (tDecodeI64(pDecoder, &pObj->checkpointId) < 0) return -1; - } - if (tDecodeCStrTo(pDecoder, pObj->reserve) < 0) return -1; - - tEndDecode(pDecoder); - return 0; -} - -void tFreeStreamObj(SViewObj *pStream) { - taosMemoryFree(pStream->sql); - taosMemoryFree(pStream->ast); - taosMemoryFree(pStream->physicalPlan); - - if (pStream->outputSchema.nCols || pStream->outputSchema.pSchema) { - taosMemoryFree(pStream->outputSchema.pSchema); - } - - pStream->tasks = freeStreamTasks(pStream->tasks); - pStream->pHTasksList = freeStreamTasks(pStream->pHTasksList); - - if (pStream->tagSchema.nCols > 0) { - taosMemoryFree(pStream->tagSchema.pSchema); - } -} - - -SSdbRaw *mndViewActionEncode(SViewObj *pStream) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - void *buf = NULL; - - SEncoder encoder; - tEncoderInit(&encoder, NULL, 0); - if (tEncodeSViewObj(&encoder, pStream) < 0) { - tEncoderClear(&encoder); - goto STREAM_ENCODE_OVER; - } - int32_t tlen = encoder.pos; - tEncoderClear(&encoder); - - int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE; - SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size); - if (pRaw == NULL) goto STREAM_ENCODE_OVER; - - buf = taosMemoryMalloc(tlen); - if (buf == NULL) goto STREAM_ENCODE_OVER; - - tEncoderInit(&encoder, buf, tlen); - if (tEncodeSStreamObj(&encoder, pStream) < 0) { - tEncoderClear(&encoder); - goto STREAM_ENCODE_OVER; - } - tEncoderClear(&encoder); - - int32_t dataPos = 0; - SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER); - SDB_SET_BINARY(pRaw, dataPos, buf, tlen, STREAM_ENCODE_OVER); - SDB_SET_DATALEN(pRaw, dataPos, STREAM_ENCODE_OVER); - - terrno = TSDB_CODE_SUCCESS; - -STREAM_ENCODE_OVER: - taosMemoryFreeClear(buf); - if (terrno != TSDB_CODE_SUCCESS) { - mError("stream:%s, failed to encode to raw:%p since %s", pStream->name, pRaw, terrstr()); - sdbFreeRaw(pRaw); - return NULL; - } - - mTrace("stream:%s, encode to raw:%p, row:%p", pStream->name, pRaw, pStream); - return pRaw; -} - -SSdbRow *mndViewActionDecode(SSdbRaw *pRaw) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - SSdbRow *pRow = NULL; - SViewObj *pStream = NULL; - void *buf = NULL; - - int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) { - goto STREAM_DECODE_OVER; - } - - if (sver != MND_STREAM_VER_NUMBER) { - terrno = 0; - mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER); - goto STREAM_DECODE_OVER; - } - - pRow = sdbAllocRow(sizeof(SViewObj)); - if (pRow == NULL) goto STREAM_DECODE_OVER; - - pStream = sdbGetRowObj(pRow); - if (pStream == NULL) goto STREAM_DECODE_OVER; - - int32_t tlen; - int32_t dataPos = 0; - SDB_GET_INT32(pRaw, dataPos, &tlen, STREAM_DECODE_OVER); - buf = taosMemoryMalloc(tlen + 1); - if (buf == NULL) goto STREAM_DECODE_OVER; - SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER); - - SDecoder decoder; - tDecoderInit(&decoder, buf, tlen + 1); - if (tDecodeSStreamObj(&decoder, pStream, sver) < 0) { - tDecoderClear(&decoder); - goto STREAM_DECODE_OVER; - } - tDecoderClear(&decoder); - - terrno = TSDB_CODE_SUCCESS; - -STREAM_DECODE_OVER: - taosMemoryFreeClear(buf); - if (terrno != TSDB_CODE_SUCCESS) { - mError("stream:%s, failed to decode from raw:%p since %s", pStream == NULL ? "null" : pStream->name, pRaw, - terrstr()); - taosMemoryFreeClear(pRow); - return NULL; - } - - mTrace("stream:%s, decode from raw:%p, row:%p", pStream->name, pRaw, pStream); - return pRow; -} - -static int32_t mndViewActionInsert(SSdb *pSdb, SViewObj *pView) { - mTrace("view:%s, perform insert action", pView->name); - return 0; -} - -static int32_t mndViewActionDelete(SSdb *pSdb, SViewObj *pStream) { - mTrace("stream:%s, perform delete action", pStream->name); - taosWLockLatch(&pStream->lock); - tFreeStreamObj(pStream); - taosWUnLockLatch(&pStream->lock); - return 0; -} - -static int32_t mndViewActionUpdate(SSdb *pSdb, SViewObj *pOldStream, SViewObj *pNewStream) { - mTrace("stream:%s, perform update action", pOldStream->name); - - atomic_exchange_32(&pOldStream->version, pNewStream->version); - - taosWLockLatch(&pOldStream->lock); - - pOldStream->status = pNewStream->status; - pOldStream->updateTime = pNewStream->updateTime; - - taosWUnLockLatch(&pOldStream->lock); - return 0; -} - -SViewObj *mndAcquireView(SMnode *pMnode, char *viewName) { - SSdb *pSdb = pMnode->pSdb; - SViewObj *pStream = sdbAcquire(pSdb, SDB_VIEW, viewName); - if (pStream == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { - terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; - } - return pStream; -} - -void mndReleaseStream(SMnode *pMnode, SViewObj *pStream) { - SSdb *pSdb = pMnode->pSdb; - sdbRelease(pSdb, pStream); -} - -static int32_t mndBuildViewObj(SMnode *pMnode, SViewObj *pObj, SCMCreateStreamReq *pCreate) { - SNode *pAst = NULL; - SQueryPlan *pPlan = NULL; - - mInfo("stream:%s to create", pCreate->name); - memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN); - pObj->createTime = taosGetTimestampMs(); - pObj->updateTime = pObj->createTime; - pObj->version = 1; - pObj->smaId = 0; - - pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name)); - - char p[TSDB_STREAM_FNAME_LEN + 32] = {0}; - snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory"); - - pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name)); - pObj->status = 0; - - pObj->conf.igExpired = pCreate->igExpired; - pObj->conf.trigger = pCreate->triggerType; - pObj->conf.triggerParam = pCreate->maxDelay; - pObj->conf.watermark = pCreate->watermark; - pObj->conf.fillHistory = pCreate->fillHistory; - pObj->deleteMark = pCreate->deleteMark; - pObj->igCheckUpdate = pCreate->igUpdate; - - memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN); - SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); - if (pSourceDb == NULL) { - mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, terrstr()); - return -1; - } - pObj->sourceDbUid = pSourceDb->uid; - mndReleaseDb(pMnode, pSourceDb); - - memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN); - - SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName); - if (pTargetDb == NULL) { - mInfo("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr()); - return -1; - } - tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN); - - if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) { - pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN); - } else { - pObj->targetStbUid = pCreate->targetStbUid; - } - pObj->targetDbUid = pTargetDb->uid; - mndReleaseDb(pMnode, pTargetDb); - - pObj->sql = pCreate->sql; - pObj->ast = pCreate->ast; - - pCreate->sql = NULL; - pCreate->ast = NULL; - - // deserialize ast - if (nodesStringToNode(pObj->ast, &pAst) < 0) { - goto FAIL; - } - - // extract output schema from ast - if (qExtractResultSchema(pAst, (int32_t *)&pObj->outputSchema.nCols, &pObj->outputSchema.pSchema) != 0) { - goto FAIL; - } - - int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols); - if (numOfNULL > 0) { - pObj->outputSchema.nCols += numOfNULL; - SSchema *pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema)); - if (!pFullSchema) { - goto FAIL; - } - - int32_t nullIndex = 0; - int32_t dataIndex = 0; - for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) { - SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex); - if (nullIndex >= numOfNULL || i < pos->slotId) { - pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes; - pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId; - pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags; - strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name); - pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type; - dataIndex++; - } else { - pFullSchema[i].bytes = 0; - pFullSchema[i].colId = pos->colId; - pFullSchema[i].flags = COL_SET_NULL; - memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN); - pFullSchema[i].type = pos->type; - nullIndex++; - } - } - taosMemoryFree(pObj->outputSchema.pSchema); - pObj->outputSchema.pSchema = pFullSchema; - } - - SPlanContext cxt = { - .pAstRoot = pAst, - .topicQuery = false, - .streamQuery = true, - .triggerType = pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger, - .watermark = pObj->conf.watermark, - .igExpired = pObj->conf.igExpired, - .deleteMark = pObj->deleteMark, - .igCheckUpdate = pObj->igCheckUpdate, - }; - - // using ast and param to build physical plan - if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { - goto FAIL; - } - - // save physcial plan - if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) { - goto FAIL; - } - - pObj->tagSchema.nCols = pCreate->numOfTags; - if (pCreate->numOfTags) { - pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema)); - } - /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/ - for (int32_t i = 0; i < pCreate->numOfTags; i++) { - SField *pField = taosArrayGet(pCreate->pTags, i); - pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1; - pObj->tagSchema.pSchema[i].bytes = pField->bytes; - pObj->tagSchema.pSchema[i].flags = pField->flags; - pObj->tagSchema.pSchema[i].type = pField->type; - memcpy(pObj->tagSchema.pSchema[i].name, pField->name, TSDB_COL_NAME_LEN); - } - -FAIL: - if (pAst != NULL) nodesDestroyNode(pAst); - if (pPlan != NULL) qDestroyQueryPlan(pPlan); - return 0; -} - - -static int32_t mndCreateView(SMnode *pMnode, char *acct, SCreateUserReq *pCreate, SRpcMsg *pReq) { - SUserObj userObj = {0}; - taosEncryptPass_c((uint8_t *)pCreate->pass, strlen(pCreate->pass), userObj.pass); - tstrncpy(userObj.user, pCreate->user, TSDB_USER_LEN); - tstrncpy(userObj.acct, acct, TSDB_USER_LEN); - userObj.createdTime = taosGetTimestampMs(); - userObj.updateTime = userObj.createdTime; - userObj.superUser = 0; // pCreate->superUser; - userObj.sysInfo = pCreate->sysInfo; - userObj.enable = pCreate->enable; - - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-user"); - if (pTrans == NULL) { - mError("user:%s, failed to create since %s", pCreate->user, terrstr()); - return -1; - } - mInfo("trans:%d, used to create user:%s", pTrans->id, pCreate->user); - - SSdbRaw *pCommitRaw = mndUserActionEncode(&userObj); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to commit redo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } - (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } - - mndTransDrop(pTrans); - return 0; -} - static int32_t mndProcessCreateViewReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - int32_t code = -1; - SViewObj *pStream = NULL; - SDbObj *pDb = NULL; SCMCreateViewReq createViewReq = {0}; - SViewObj streamObj = {0}; +#ifndef TD_ENTERPRISE + return TSDB_CODE_OPS_NOT_SUPPORT; +#else if (tDeserializeSCMCreateViewReq(pReq->pCont, pReq->contLen, &createViewReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; goto _OVER; @@ -547,248 +60,98 @@ static int32_t mndProcessCreateViewReq(SRpcMsg *pReq) { mInfo("start to create view:%s, sql:%s", createViewReq.name, createViewReq.sql); - pStream = mndAcquireStream(pMnode, createStreamReq.name); - if (pStream != NULL) { - if (createStreamReq.igExists) { - mInfo("stream:%s, already exist, ignore exist is set", createStreamReq.name); - code = 0; - goto _OVER; - } else { - terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST; - goto _OVER; - } - } else if (terrno != TSDB_CODE_MND_STREAM_NOT_EXIST) { - goto _OVER; - } - - if (mndCreateView(pMnode, &streamObj, &createStreamReq) < 0) { - mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); - goto _OVER; - } - - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream"); - if (pTrans == NULL) { - mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); - goto _OVER; - } - - mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); - - mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - mndTransDrop(pTrans); - goto _OVER; - } - - if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb) != 0) { - mndTransDrop(pTrans); - goto _OVER; - } - - if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb) != 0) { - mndTransDrop(pTrans); - goto _OVER; - } - - // execute creation - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - goto _OVER; - } - - mndTransDrop(pTrans); - - taosThreadMutexLock(&execNodeList.lock); - mDebug("register to stream task node list"); - keepStreamTasksInBuf(&streamObj, &execNodeList); - taosThreadMutexUnlock(&execNodeList.lock); - - code = TSDB_CODE_ACTION_IN_PROGRESS; - - char detail[2000] = {0}; - sprintf(detail, - "checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64 - ", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64 ", maxDelay:%" PRId64 - ", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64, - createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark, - createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate, - createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB, - createStreamReq.targetStbFullName, createStreamReq.triggerType, createStreamReq.watermark); - - SName name = {0}; - tNameFromString(&name, createStreamReq.name, T_NAME_ACCT | T_NAME_DB); - //reuse this function for stream - - auditRecord(pReq, pMnode->clusterId, "createView", name.dbname, "", detail); - -_OVER: - if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); - } - - mndReleaseStream(pMnode, pStream); - - tFreeSCMCreateStreamReq(&createStreamReq); - tFreeStreamObj(&streamObj); - return code; + return mndProcessCreateViewReqImpl(&createViewReq, pReq); +#endif } -static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SViewObj *pStream = NULL; - - SMDropStreamReq dropReq = {0}; - if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - pStream = mndAcquireStream(pMnode, dropReq.name); - - if (pStream == NULL) { - if (dropReq.igNotExists) { - mInfo("stream:%s, not exist, ignore not exist is set", dropReq.name); - sdbRelease(pMnode->pSdb, pStream); - return 0; - } else { - terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; - return -1; +static int32_t mndProcessDropViewReq(SRpcMsg *pReq) { +#ifndef TD_ENTERPRISE + return TSDB_CODE_OPS_NOT_SUPPORT; +#else + if (tDeserializeSCMDropViewReq(pReq->pCont, pReq->contLen, &createViewReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; } - } - - if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { - sdbRelease(pMnode->pSdb, pStream); - return -1; - } - - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stream"); - if (pTrans == NULL) { - mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); - sdbRelease(pMnode->pSdb, pStream); - return -1; - } - - mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name); - - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); - return -1; - } - // mndTransSetSerial(pTrans); - - // drop all tasks - if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { - mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr()); - sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); - return -1; - } - - // drop stream - if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { - sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); - return -1; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); - sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); - return -1; - } - - char detail[100] = {0}; - sprintf(detail, "igNotExists:%d", dropReq.igNotExists); - - SName name = {0}; - tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB); - //reuse this function for stream - - auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, "", detail); - - sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); - - return TSDB_CODE_ACTION_IN_PROGRESS; + + mInfo("start to drop view:%s, sql:%s", createViewReq.name, createViewReq.sql); + + return mndProcessDropViewReqImpl(&createViewReq, pReq); +#endif } -static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { +static int32_t mndRetrieveView(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; - SViewObj *pStream = NULL; + SViewObj *pView = NULL; while (numOfRows < rows) { - pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream); + pShow->pIter = sdbFetch(pSdb, SDB_VIEW, pShow->pIter, (void **)&pView); if (pShow->pIter == NULL) break; SColumnInfoData *pColInfo; SName n; int32_t cols = 0; - char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); + char viewName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(viewName, mndGetDbStr(pView->name), sizeof(viewName)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false); + colDataSetVal(pColInfo, numOfRows, (const char *)viewName, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false); + colDataSetVal(pColInfo, numOfRows, (const char *)&pView->createTime, false); char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql)); + STR_WITH_MAXSIZE_TO_VARSTR(sql, pView->sql, sizeof(sql)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)sql, false); char status[20 + VARSTR_HEADER_SIZE] = {0}; char status2[20] = {0}; - mndShowStreamStatus(status2, pStream); + mndShowViewStatus(status2, pView); STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&status, false); char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pStream->sourceDb), sizeof(sourceDB)); + STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pView->sourceDb), sizeof(sourceDB)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&sourceDB, false); char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pStream->targetDb), sizeof(targetDB)); + STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pView->targetDb), sizeof(targetDB)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&targetDB, false); - if (pStream->targetSTbName[0] == 0) { + if (pView->targetSTbName[0] == 0) { pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, NULL, true); } else { char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pStream->targetSTbName), sizeof(targetSTB)); + STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pView->targetSTbName), sizeof(targetSTB)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&targetSTB, false); } pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false); + colDataSetVal(pColInfo, numOfRows, (const char *)&pView->conf.watermark, false); char trigger[20 + VARSTR_HEADER_SIZE] = {0}; char trigger2[20] = {0}; - mndShowStreamTrigger(trigger2, pStream); + mndShowViewTrigger(trigger2, pView); STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false); numOfRows++; - sdbRelease(pSdb, pStream); + sdbRelease(pSdb, pView); } pShow->numOfRows += numOfRows; return numOfRows; } -static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { +static void mndCancelGetNextView(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); }