diff --git a/cmake/rocksdb_CMakeLists.txt.in b/cmake/rocksdb_CMakeLists.txt.in index 3757e5d761..3b024a8931 100644 --- a/cmake/rocksdb_CMakeLists.txt.in +++ b/cmake/rocksdb_CMakeLists.txt.in @@ -1,6 +1,7 @@ # rocksdb IF (NOT ${TD_LINUX}) + ExternalProject_Add(rocksdb URL https://github.com/facebook/rocksdb/archive/refs/tags/v8.1.1.tar.gz URL_HASH MD5=3b4c97ee45df9c8a5517308d31ab008b @@ -10,6 +11,6 @@ ExternalProject_Add(rocksdb CONFIGURE_COMMAND "" BUILD_COMMAND "" INSTALL_COMMAND "" - EST_COMMAND "" + TEST_COMMAND "" ) ENDIF(NOT ${TD_LINUX}) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d78e771fcf..9b392c0240 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3354,6 +3354,17 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) { taosArrayDestroyEx(pRsp->topics, (FDelete)tDeleteMqSubTopicEp); } +typedef struct { + int32_t vgId; + STqOffsetVal offset; + int64_t rows; +}OffsetRows; + +typedef struct{ + char topicName[TSDB_TOPIC_FNAME_LEN]; + SArray* offsetRows; +}TopicOffsetRows; + #define TD_AUTO_CREATE_TABLE 0x1 typedef struct { int64_t suid; diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index f0c9cffd0f..4e4275e5d8 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -216,7 +216,7 @@ bool fmIsUserDefinedFunc(int32_t funcId); bool fmIsDistExecFunc(int32_t funcId); bool fmIsForbidFillFunc(int32_t funcId); bool fmIsForbidStreamFunc(int32_t funcId); -bool fmIsForbidSuperTableFunc(int32_t funcId); +bool fmIsForbidSysTableFunc(int32_t funcId); bool fmIsIntervalInterpoFunc(int32_t funcId); bool fmIsInterpFunc(int32_t funcId); bool fmIsLastRowFunc(int32_t funcId); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 1aa08ff802..b19a0d783d 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -214,7 +214,6 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); void walRefFirstVer(SWal *, SWalRef *); void walRefLastVer(SWal *, SWalRef *); -SWalRef *walRefCommittedVer(SWal *); SWalRef *walOpenRef(SWal *); void walCloseRef(SWal *pWal, int64_t refId); diff --git a/include/util/talgo.h b/include/util/talgo.h index f9d51c4b5b..7c92c0fe87 100644 --- a/include/util/talgo.h +++ b/include/util/talgo.h @@ -31,7 +31,7 @@ typedef void *(*__array_item_dup_fn_t)(void *); typedef void (*FDelete)(void *); typedef int32_t (*FEncode)(void **buf, const void *dst); -typedef void *(*FDecode)(const void *buf, void *dst); +typedef void *(*FDecode)(const void *buf, void *dst, int8_t sver); #define TD_EQ 0x1 #define TD_GT 0x2 diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 9e5229870e..b1c8ada6a4 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -704,6 +704,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_INVALID_TAGS_PC TAOS_DEF_ERROR_CODE(0, 0x2665) #define TSDB_CODE_PAR_INVALID_TIMELINE_QUERY TAOS_DEF_ERROR_CODE(0, 0x2666) #define TSDB_CODE_PAR_INVALID_OPTR_USAGE TAOS_DEF_ERROR_CODE(0, 0x2667) +#define TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x2668) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/include/util/tarray.h b/include/util/tarray.h index 4bf24b46b9..022b14bb2c 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -244,7 +244,7 @@ int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param); int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode); -void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz); +void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz, int8_t sver); #ifdef __cplusplus } diff --git a/packaging/checkPackageRuning.py b/packaging/checkPackageRuning.py index 2edeeb6dbb..96e2378fb3 100755 --- a/packaging/checkPackageRuning.py +++ b/packaging/checkPackageRuning.py @@ -42,8 +42,8 @@ else: # os.system("rm -rf /var/lib/taos/*") # os.system("systemctl restart taosd ") -# wait a moment ,at least 5 seconds -time.sleep(5) +# wait a moment ,at least 10 seconds +time.sleep(10) # prepare data by taosBenchmark diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 20b1f512d5..ea46b70693 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -23,10 +23,6 @@ static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) { SEpSet epSet = {0}; dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet); - if(epSet.numOfEps == 0){ - return; - } - const int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet); pMsg->pCont = rpcMallocCont(contLen); if (pMsg->pCont == NULL) { diff --git a/source/dnode/mgmt/node_util/src/dmEps.c b/source/dnode/mgmt/node_util/src/dmEps.c index f014b30a03..1564a09035 100644 --- a/source/dnode/mgmt/node_util/src/dmEps.c +++ b/source/dnode/mgmt/node_util/src/dmEps.c @@ -350,7 +350,7 @@ void dmRotateMnodeEpSet(SDnodeData *pData) { } void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) { - if(pData->validMnodeEps) return; + if(!pData->validMnodeEps) return; dmGetMnodeEpSet(pData, pEpSet); dTrace("msg is redirected, handle:%p num:%d use:%d", pMsg->info.handle, pEpSet->numOfEps, pEpSet->inUse); for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 82b714e6eb..3859dd688d 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -559,24 +559,25 @@ void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer typedef struct { int32_t vgId; - char* qmsg; // SubPlanToString +// char* qmsg; // SubPlanToString SEpSet epSet; } SMqVgEp; SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp); void tDeleteSMqVgEp(SMqVgEp* pVgEp); int32_t tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp); -void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp); +void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp, int8_t sver); typedef struct { int64_t consumerId; // -1 for unassigned SArray* vgs; // SArray + SArray* offsetRows; // SArray } SMqConsumerEp; SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp); void tDeleteSMqConsumerEp(void* pEp); int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp); -void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp); +void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp, int8_t sver); typedef struct { char key[TSDB_SUBSCRIBE_KEY_LEN]; @@ -588,34 +589,36 @@ typedef struct { int64_t stbUid; SHashObj* consumerHash; // consumerId -> SMqConsumerEp SArray* unassignedVgs; // SArray + SArray* offsetRows; char dbName[TSDB_DB_FNAME_LEN]; + char* qmsg; // SubPlanToString } SMqSubscribeObj; SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]); SMqSubscribeObj* tCloneSubscribeObj(const SMqSubscribeObj* pSub); void tDeleteSubscribeObj(SMqSubscribeObj* pSub); int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub); -void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub); +void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub, int8_t sver); -typedef struct { - int32_t epoch; - SArray* consumers; // SArray -} SMqSubActionLogEntry; +//typedef struct { +// int32_t epoch; +// SArray* consumers; // SArray +//} SMqSubActionLogEntry; -SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry); -void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry); -int32_t tEncodeSMqSubActionLogEntry(void** buf, const SMqSubActionLogEntry* pEntry); -void* tDecodeSMqSubActionLogEntry(const void* buf, SMqSubActionLogEntry* pEntry); - -typedef struct { - char key[TSDB_SUBSCRIBE_KEY_LEN]; - SArray* logs; // SArray -} SMqSubActionLogObj; - -SMqSubActionLogObj* tCloneSMqSubActionLogObj(SMqSubActionLogObj* pLog); -void tDeleteSMqSubActionLogObj(SMqSubActionLogObj* pLog); -int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogObj* pLog); -void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog); +//SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry); +//void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry); +//int32_t tEncodeSMqSubActionLogEntry(void** buf, const SMqSubActionLogEntry* pEntry); +//void* tDecodeSMqSubActionLogEntry(const void* buf, SMqSubActionLogEntry* pEntry); +// +//typedef struct { +// char key[TSDB_SUBSCRIBE_KEY_LEN]; +// SArray* logs; // SArray +//} SMqSubActionLogObj; +// +//SMqSubActionLogObj* tCloneSMqSubActionLogObj(SMqSubActionLogObj* pLog); +//void tDeleteSMqSubActionLogObj(SMqSubActionLogObj* pLog); +//int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogObj* pLog); +//void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog); typedef struct { int32_t oldConsumerNum; @@ -634,7 +637,7 @@ typedef struct { SArray* removedConsumers; // SArray SArray* modifyConsumers; // SArray SMqSubscribeObj* pSub; - SMqSubActionLogEntry* pLogEntry; +// SMqSubActionLogEntry* pLogEntry; } SMqRebOutputObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 117c1082a5..f530128572 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -644,7 +644,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SCMSubscribeReq subscribe = {0}; tDeserializeSCMSubscribeReq(msgStr, &subscribe); - uint64_t consumerId = subscribe.consumerId; + int64_t consumerId = subscribe.consumerId; char *cgroup = subscribe.cgroup; SMqConsumerObj *pExistedConsumer = NULL; SMqConsumerObj *pConsumerNew = NULL; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 6dab018236..015cb6d7ce 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -187,14 +187,14 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); if (pVgEpNew == NULL) return NULL; pVgEpNew->vgId = pVgEp->vgId; - pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg); +// pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg); pVgEpNew->epSet = pVgEp->epSet; return pVgEpNew; } void tDeleteSMqVgEp(SMqVgEp *pVgEp) { if (pVgEp) { - taosMemoryFreeClear(pVgEp->qmsg); +// taosMemoryFreeClear(pVgEp->qmsg); taosMemoryFree(pVgEp); } } @@ -202,14 +202,18 @@ void tDeleteSMqVgEp(SMqVgEp *pVgEp) { int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pVgEp->vgId); - tlen += taosEncodeString(buf, pVgEp->qmsg); +// tlen += taosEncodeString(buf, pVgEp->qmsg); tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); return tlen; } -void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) { +void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) { buf = taosDecodeFixedI32(buf, &pVgEp->vgId); - buf = taosDecodeString(buf, &pVgEp->qmsg); + if(sver == 1){ + uint64_t size = 0; + buf = taosDecodeVariantU64(buf, &size); + buf = POINTER_SHIFT(buf, size); + } buf = taosDecodeSEpSet(buf, &pVgEp->epSet); return (void *)buf; } @@ -395,6 +399,22 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp); + int32_t szVgs = taosArrayGetSize(pConsumerEp->offsetRows); + tlen += taosEncodeFixedI32(buf, szVgs); + for (int32_t j= 0; j < szVgs; ++j) { + OffsetRows *offRows = taosArrayGet(pConsumerEp->offsetRows, j); + tlen += taosEncodeFixedI32(buf, offRows->vgId); + tlen += taosEncodeFixedI64(buf, offRows->rows); + tlen += taosEncodeFixedI8(buf, offRows->offset.type); + if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) { + tlen += taosEncodeFixedI64(buf, offRows->offset.uid); + tlen += taosEncodeFixedI64(buf, offRows->offset.ts); + } else if (offRows->offset.type == TMQ_OFFSET__LOG) { + tlen += taosEncodeFixedI64(buf, offRows->offset.version); + } else { + // do nothing + } + } #if 0 int32_t sz = taosArrayGetSize(pConsumerEp->vgs); tlen += taosEncodeFixedI32(buf, sz); @@ -406,9 +426,9 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { return tlen; } -void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) { +void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) { buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); - buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp)); + buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver); #if 0 int32_t sz; buf = taosDecodeFixedI32(buf, &sz); @@ -419,7 +439,28 @@ void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) { taosArrayPush(pConsumerEp->vgs, &pVgEp); } #endif - + if (sver > 1){ + int32_t szVgs = 0; + buf = taosDecodeFixedI32(buf, &szVgs); + if(szVgs > 0){ + pConsumerEp->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows)); + if (NULL == pConsumerEp->offsetRows) return NULL; + for (int32_t j= 0; j < szVgs; ++j) { + OffsetRows* offRows = taosArrayReserve(pConsumerEp->offsetRows, 1); + buf = taosDecodeFixedI32(buf, &offRows->vgId); + buf = taosDecodeFixedI64(buf, &offRows->rows); + buf = taosDecodeFixedI8(buf, &offRows->offset.type); + if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) { + buf = taosDecodeFixedI64(buf, &offRows->offset.uid); + buf = taosDecodeFixedI64(buf, &offRows->offset.ts); + } else if (offRows->offset.type == TMQ_OFFSET__LOG) { + buf = taosDecodeFixedI64(buf, &offRows->offset.version); + } else { + // do nothing + } + } + } + } return (void *)buf; } @@ -468,7 +509,9 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp)); } pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp); + pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL); memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN); + pSubNew->qmsg = taosStrdup(pSub->qmsg); return pSubNew; } @@ -482,6 +525,8 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) { } taosHashCleanup(pSub->consumerHash); taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp); + taosMemoryFreeClear(pSub->qmsg); + taosArrayDestroy(pSub->offsetRows); } int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { @@ -508,10 +553,27 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { if (cnt != sz) return -1; tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp); tlen += taosEncodeString(buf, pSub->dbName); + int32_t szVgs = taosArrayGetSize(pSub->offsetRows); + tlen += taosEncodeFixedI32(buf, szVgs); + for (int32_t j= 0; j < szVgs; ++j) { + OffsetRows *offRows = taosArrayGet(pSub->offsetRows, j); + tlen += taosEncodeFixedI32(buf, offRows->vgId); + tlen += taosEncodeFixedI64(buf, offRows->rows); + tlen += taosEncodeFixedI8(buf, offRows->offset.type); + if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) { + tlen += taosEncodeFixedI64(buf, offRows->offset.uid); + tlen += taosEncodeFixedI64(buf, offRows->offset.ts); + } else if (offRows->offset.type == TMQ_OFFSET__LOG) { + tlen += taosEncodeFixedI64(buf, offRows->offset.version); + } else { + // do nothing + } + } + tlen += taosEncodeString(buf, pSub->qmsg); return tlen; } -void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) { +void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) { // buf = taosDecodeStringTo(buf, pSub->key); buf = taosDecodeFixedI64(buf, &pSub->dbUid); @@ -526,74 +588,97 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) { pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); for (int32_t i = 0; i < sz; i++) { SMqConsumerEp consumerEp = {0}; - buf = tDecodeSMqConsumerEp(buf, &consumerEp); + buf = tDecodeSMqConsumerEp(buf, &consumerEp, sver); taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)); } - buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp)); + buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver); buf = taosDecodeStringTo(buf, pSub->dbName); + if (sver > 1){ + int32_t szVgs = 0; + buf = taosDecodeFixedI32(buf, &szVgs); + if(szVgs > 0){ + pSub->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows)); + if (NULL == pSub->offsetRows) return NULL; + for (int32_t j= 0; j < szVgs; ++j) { + OffsetRows* offRows = taosArrayReserve(pSub->offsetRows, 1); + buf = taosDecodeFixedI32(buf, &offRows->vgId); + buf = taosDecodeFixedI64(buf, &offRows->rows); + buf = taosDecodeFixedI8(buf, &offRows->offset.type); + if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) { + buf = taosDecodeFixedI64(buf, &offRows->offset.uid); + buf = taosDecodeFixedI64(buf, &offRows->offset.ts); + } else if (offRows->offset.type == TMQ_OFFSET__LOG) { + buf = taosDecodeFixedI64(buf, &offRows->offset.version); + } else { + // do nothing + } + } + } + buf = taosDecodeString(buf, &pSub->qmsg); + } return (void *)buf; } -SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { - SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry)); - if (pEntryNew == NULL) return NULL; - pEntryNew->epoch = pEntry->epoch; - pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp); - return pEntryNew; -} - -void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { - taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp); -} - -int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) { - int32_t tlen = 0; - tlen += taosEncodeFixedI32(buf, pEntry->epoch); - tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry); - return tlen; -} - -void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) { - buf = taosDecodeFixedI32(buf, &pEntry->epoch); - buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry)); - return (void *)buf; -} - -SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) { - SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj)); - if (pLogNew == NULL) return pLogNew; - memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN); - pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp); - return pLogNew; -} - -void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) { - taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp); -} - -int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) { - int32_t tlen = 0; - tlen += taosEncodeString(buf, pLog->key); - tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry); - return tlen; -} - -void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) { - buf = taosDecodeStringTo(buf, pLog->key); - buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry)); - return (void *)buf; -} - -int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) { - int32_t tlen = 0; - tlen += taosEncodeString(buf, pOffset->key); - tlen += taosEncodeFixedI64(buf, pOffset->offset); - return tlen; -} - -void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) { - buf = taosDecodeStringTo(buf, pOffset->key); - buf = taosDecodeFixedI64(buf, &pOffset->offset); - return buf; -} +//SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { +// SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry)); +// if (pEntryNew == NULL) return NULL; +// pEntryNew->epoch = pEntry->epoch; +// pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp); +// return pEntryNew; +//} +// +//void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { +// taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp); +//} +// +//int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) { +// int32_t tlen = 0; +// tlen += taosEncodeFixedI32(buf, pEntry->epoch); +// tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry); +// return tlen; +//} +// +//void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) { +// buf = taosDecodeFixedI32(buf, &pEntry->epoch); +// buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry)); +// return (void *)buf; +//} +// +//SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) { +// SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj)); +// if (pLogNew == NULL) return pLogNew; +// memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN); +// pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp); +// return pLogNew; +//} +// +//void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) { +// taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp); +//} +// +//int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) { +// int32_t tlen = 0; +// tlen += taosEncodeString(buf, pLog->key); +// tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry); +// return tlen; +//} +// +//void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) { +// buf = taosDecodeStringTo(buf, pLog->key); +// buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry)); +// return (void *)buf; +//} +// +//int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) { +// int32_t tlen = 0; +// tlen += taosEncodeString(buf, pOffset->key); +// tlen += taosEncodeFixedI64(buf, pOffset->offset); +// return tlen; +//} +// +//void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) { +// buf = taosDecodeStringTo(buf, pOffset->key); +// buf = taosDecodeFixedI64(buf, &pOffset->offset); +// return buf; +//} diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 64082536da..9a611fe46a 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -570,25 +570,21 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId); - if (pSubplan) { - int32_t msgLen; - - pSubplan->execNode.epSet = pVgEp->epSet; - pSubplan->execNode.nodeId = pVgEp->vgId; - - if (qSubPlanToString(pSubplan, &pVgEp->qmsg, &msgLen) < 0) { - sdbRelease(pSdb, pVgroup); - qDestroyQueryPlan(pPlan); - terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; - } - } else { - pVgEp->qmsg = taosStrdup(""); - } - sdbRelease(pSdb, pVgroup); } + if (pSubplan) { + int32_t msgLen; + + if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) { + qDestroyQueryPlan(pPlan); + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + } else { + pSub->qmsg = taosStrdup(""); + } + qDestroyQueryPlan(pPlan); return 0; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 74421afa33..ed1a602cd3 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -24,7 +24,7 @@ #include "tcompare.h" #include "tname.h" -#define MND_SUBSCRIBE_VER_NUMBER 1 +#define MND_SUBSCRIBE_VER_NUMBER 2 #define MND_SUBSCRIBE_RESERVE_SIZE 64 #define MND_SUBSCRIBE_REBALANCE_CNT 3 @@ -99,13 +99,23 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj return pSub; } -static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscribeObj *pSub, - const SMqRebOutputVg *pRebVg) { +static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub, + const SMqRebOutputVg *pRebVg, SSubplan* pPlan) { SMqRebVgReq req = {0}; req.oldConsumerId = pRebVg->oldConsumerId; req.newConsumerId = pRebVg->newConsumerId; req.vgId = pRebVg->pVgEp->vgId; - req.qmsg = pRebVg->pVgEp->qmsg; + if(pPlan){ + pPlan->execNode.epSet = pRebVg->pVgEp->epSet; + pPlan->execNode.nodeId = pRebVg->pVgEp->vgId; + int32_t msgLen; + if (qSubPlanToString(pPlan, &req.qmsg, &msgLen) < 0) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + }else{ + req.qmsg = taosStrdup(""); + } req.subType = pSub->subType; req.withMeta = pSub->withMeta; req.suid = pSub->stbUid; @@ -115,6 +125,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri int32_t ret = 0; tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret); if (ret < 0) { + taosMemoryFree(req.qmsg); return -1; } @@ -122,6 +133,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri void *buf = taosMemoryMalloc(tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(req.qmsg); return -1; } @@ -135,17 +147,19 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri if (tEncodeSMqRebVgReq(&encoder, &req) < 0) { taosMemoryFreeClear(buf); tEncoderClear(&encoder); + taosMemoryFree(req.qmsg); return -1; } tEncoderClear(&encoder); *pBuf = buf; *pLen = tlen; + taosMemoryFree(req.qmsg); return 0; } -static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub, - const SMqRebOutputVg *pRebVg) { +static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub, + const SMqRebOutputVg *pRebVg, SSubplan* pPlan) { // if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { // terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; // return -1; @@ -153,7 +167,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM void *buf; int32_t tlen; - if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg) < 0) { + if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan) < 0) { return -1; } @@ -255,7 +269,7 @@ static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pI for (int32_t i = 0; i < numOfNewConsumers; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i); - SMqConsumerEp newConsumerEp; + SMqConsumerEp newConsumerEp = {0}; newConsumerEp.consumerId = consumerId; newConsumerEp.vgs = taosArrayInit(0, sizeof(void *)); @@ -483,14 +497,25 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR } static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) { + struct SSubplan* pPlan = NULL; + if(strcmp(pOutput->pSub->qmsg, "") != 0){ + int32_t code = qStringToSubplan(pOutput->pSub->qmsg, &pPlan); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb"); if (pTrans == NULL) { + nodesDestroyNode((SNode*)pPlan); return -1; } mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL); if (mndTrancCheckConflict(pMnode, pTrans) != 0) { mndTransDrop(pTrans); + nodesDestroyNode((SNode*)pPlan); return -1; } @@ -500,11 +525,13 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu int32_t vgNum = taosArrayGetSize(rebVgs); for (int32_t i = 0; i < vgNum; i++) { SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i); - if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) { + if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan) < 0) { mndTransDrop(pTrans); + nodesDestroyNode((SNode*)pPlan); return -1; } } + nodesDestroyNode((SNode*)pPlan); // 2. redo log: subscribe and vg assignment // subscribe @@ -809,7 +836,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER; - if (sver != MND_SUBSCRIBE_VER_NUMBER) { + if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; goto SUB_DECODE_OVER; } @@ -828,7 +855,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER); - if (tDecodeSubscribeObj(buf, pSub) == NULL) { + if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) { goto SUB_DECODE_OVER; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 4ba8d6d69f..b35dc71ed9 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -139,6 +139,7 @@ static STqMgmt tqMgmt = {0}; int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle); int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); +void tqDestroyTqHandle(void* data); // tqRead int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); @@ -161,6 +162,8 @@ int32_t tqMetaRestoreHandle(STQ* pTq); int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen); int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key); int32_t tqMetaRestoreCheckInfo(STQ* pTq); +int32_t tqMetaGetHandle(STQ* pTq, const char* key); +int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle); STqOffsetStore* tqOffsetOpen(STQ* pTq); void tqOffsetClose(STqOffsetStore*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 731c85e4d0..9c906765ec 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -62,7 +62,7 @@ void tqCleanUp() { } } -static void destroyTqHandle(void* data) { +void tqDestroyTqHandle(void* data) { STqHandle* pData = (STqHandle*)data; qDestroyTask(pData->execHandle.task); @@ -102,7 +102,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->walLogLastVer = pVnode->pWal->vers.lastVer; pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); - taosHashSetFreeFp(pTq->pHandle, destroyTqHandle); + taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle); taosInitRWLatch(&pTq->lock); pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); @@ -690,103 +690,33 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg return -1; } - SVnode* pVnode = pTq->pVnode; - int32_t vgId = TD_VID(pVnode); - - tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey, + tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, req.oldConsumerId, req.newConsumerId); - STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); + STqHandle* pHandle = NULL; + while(1){ + pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); + if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0){ + break; + } + } + if (pHandle == NULL) { if (req.oldConsumerId != -1) { tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64, req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId); } - if (req.newConsumerId == -1) { tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId); goto end; } - - STqHandle tqHandle = {0}; - pHandle = &tqHandle; - - memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN); - pHandle->consumerId = req.newConsumerId; - pHandle->epoch = -1; - - pHandle->execHandle.subType = req.subType; - pHandle->fetchMeta = req.withMeta; - - // TODO version should be assigned and refed during preprocess - SWalRef* pRef = walRefCommittedVer(pVnode->pWal); - if (pRef == NULL) { - ret = -1; + STqHandle handle = {0}; + ret = tqCreateHandle(pTq, &req, &handle); + if(ret < 0){ + tqDestroyTqHandle(&handle); goto end; } - - int64_t ver = pRef->refVer; - pHandle->pRef = pRef; - - SReadHandle handle = {.vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver}; - initStorageAPI(&handle.api); - - pHandle->snapshotVer = ver; - - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - pHandle->execHandle.execCol.qmsg = taosStrdup(req.qmsg); - - pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, - &pHandle->execHandle.numOfCols, req.newConsumerId); - void* scanner = NULL; - qExtractStreamScanner(pHandle->execHandle.task, &scanner); - pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner); - } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { - pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); - pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); - - pHandle->execHandle.execDb.pFilterOutTbUid = - taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - buildSnapContext(handle.vnode, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, - (SSnapContext**)(&handle.sContext)); - - pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId); - } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { - pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); - pHandle->execHandle.execTb.suid = req.suid; - pHandle->execHandle.execTb.qmsg = taosStrdup(req.qmsg); - - if (strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) { - if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) { - tqError("nodesStringToNode error in sub stable, since %s, vgId:%d, subkey:%s consumer:0x%" PRIx64, terrstr(), - pVnode->config.vgId, req.subKey, pHandle->consumerId); - return -1; - } - } - - buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, - (SSnapContext**)(&handle.sContext)); - pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId); - - SArray* tbUidList = NULL; - ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList, pHandle->execHandle.task); - if (ret != TDB_CODE_SUCCESS) { - tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, - pHandle->consumerId); - taosArrayDestroy(tbUidList); - goto end; - } - tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64, - pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid); - pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); - tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList, NULL); - taosArrayDestroy(tbUidList); - } - - taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); - tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, pHandle->consumerId); - ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); - goto end; + ret = tqMetaSaveHandle(pTq, req.subKey, &handle); } else { taosWLockLatch(&pTq->lock); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index ba6d7cb501..df1c9ca7c9 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -88,9 +88,9 @@ int32_t tqMetaOpen(STQ* pTq) { return -1; } - if (tqMetaRestoreHandle(pTq) < 0) { - return -1; - } +// if (tqMetaRestoreHandle(pTq) < 0) { +// return -1; +// } if (tqMetaRestoreCheckInfo(pTq) < 0) { return -1; @@ -274,6 +274,120 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { return 0; } +static int buildHandle(STQ* pTq, STqHandle* handle){ + SVnode* pVnode = pTq->pVnode; + int32_t vgId = TD_VID(pVnode); + + handle->pRef = walOpenRef(pVnode->pWal); + if (handle->pRef == NULL) { + return -1; + } + walSetRefVer(handle->pRef, handle->snapshotVer); + + SReadHandle reader = { + .vnode = pVnode, + .initTableReader = true, + .initTqReader = true, + .version = handle->snapshotVer, + }; + + initStorageAPI(&reader.api); + + if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + handle->execHandle.task = + qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId, &handle->execHandle.numOfCols, handle->consumerId); + if (handle->execHandle.task == NULL) { + tqError("cannot create exec task for %s", handle->subKey); + return -1; + } + void* scanner = NULL; + qExtractStreamScanner(handle->execHandle.task, &scanner); + if (scanner == NULL) { + tqError("cannot extract stream scanner for %s", handle->subKey); + return -1; + } + handle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner); + if (handle->execHandle.pTqReader == NULL) { + tqError("cannot extract exec reader for %s", handle->subKey); + return -1; + } + } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) { + handle->pWalReader = walOpenReader(pVnode->pWal, NULL); + handle->execHandle.pTqReader = tqReaderOpen(pVnode); + + buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta, + (SSnapContext**)(&reader.sContext)); + handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId); + } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { + handle->pWalReader = walOpenReader(pVnode->pWal, NULL); + + if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) { + if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) { + tqError("nodesStringToNode error in sub stable, since %s", terrstr()); + return -1; + } + } + buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid, handle->execHandle.subType, + handle->fetchMeta, (SSnapContext**)(&reader.sContext)); + handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId); + + SArray* tbUidList = NULL; + int ret = qGetTableList(handle->execHandle.execTb.suid, pVnode, handle->execHandle.execTb.node, &tbUidList, handle->execHandle.task); + if(ret != TDB_CODE_SUCCESS) { + tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle->subKey, handle->consumerId); + taosArrayDestroy(tbUidList); + return -1; + } + tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, handle->execHandle.execTb.suid); + handle->execHandle.pTqReader = tqReaderOpen(pVnode); + tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL); + taosArrayDestroy(tbUidList); + } + return 0; +} + +static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){ + int32_t vgId = TD_VID(pTq->pVnode); + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + tDecodeSTqHandle(&decoder, handle); + tDecoderClear(&decoder); + + if(buildHandle(pTq, handle) < 0){ + return -1; + } + tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId); + return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); +} + +int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ + int32_t vgId = TD_VID(pTq->pVnode); + + memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN); + handle->consumerId = req->newConsumerId; + handle->epoch = -1; + + handle->execHandle.subType = req->subType; + handle->fetchMeta = req->withMeta; + if(req->subType == TOPIC_SUB_TYPE__COLUMN){ + handle->execHandle.execCol.qmsg = taosStrdup(req->qmsg); + }else if(req->subType == TOPIC_SUB_TYPE__DB){ + handle->execHandle.execDb.pFilterOutTbUid = + taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + }else if(req->subType == TOPIC_SUB_TYPE__TABLE){ + handle->execHandle.execTb.suid = req->suid; + handle->execHandle.execTb.qmsg = taosStrdup(req->qmsg); + } + + handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal); + + if(buildHandle(pTq, handle) < 0){ + return -1; + } + tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId); + return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); +} + int32_t tqMetaRestoreHandle(STQ* pTq) { int code = 0; TBC* pCur = NULL; @@ -281,97 +395,40 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { return -1; } - int32_t vgId = TD_VID(pTq->pVnode); void* pKey = NULL; int kLen = 0; void* pVal = NULL; int vLen = 0; - SDecoder decoder; tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { STqHandle handle = {0}; - tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - tDecodeSTqHandle(&decoder, &handle); - tDecoderClear(&decoder); - - handle.pRef = walOpenRef(pTq->pVnode->pWal); - if (handle.pRef == NULL) { - code = -1; - goto end; + code = restoreHandle(pTq, pVal, vLen, &handle); + if (code < 0) { + tqDestroyTqHandle(&handle); + break; } - walSetRefVer(handle.pRef, handle.snapshotVer); - - SReadHandle reader = { - .vnode = pTq->pVnode, - .initTableReader = true, - .initTqReader = true, - .version = handle.snapshotVer - }; - - initStorageAPI(&reader.api); - - if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - handle.execHandle.task = - qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0); - if (handle.execHandle.task == NULL) { - tqError("cannot create exec task for %s", handle.subKey); - code = -1; - goto end; - } - void* scanner = NULL; - qExtractStreamScanner(handle.execHandle.task, &scanner); - if (scanner == NULL) { - tqError("cannot extract stream scanner for %s", handle.subKey); - code = -1; - goto end; - } - handle.execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner); - if (handle.execHandle.pTqReader == NULL) { - tqError("cannot extract exec reader for %s", handle.subKey); - code = -1; - goto end; - } - } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) { - handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode); - - buildSnapContext(reader.vnode, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, - (SSnapContext**)(&reader.sContext)); - handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0); - } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) { - handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - - if(handle.execHandle.execTb.qmsg != NULL && strcmp(handle.execHandle.execTb.qmsg, "") != 0) { - if (nodesStringToNode(handle.execHandle.execTb.qmsg, &handle.execHandle.execTb.node) != 0) { - tqError("nodesStringToNode error in sub stable, since %s", terrstr()); - return -1; - } - } - buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, - handle.fetchMeta, (SSnapContext**)(&reader.sContext)); - handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0); - - SArray* tbUidList = NULL; - int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode, handle.execHandle.execTb.node, &tbUidList, handle.execHandle.task); - if(ret != TDB_CODE_SUCCESS) { - tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle.subKey, handle.consumerId); - taosArrayDestroy(tbUidList); - goto end; - } - tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid); - handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode); - tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList, NULL); - taosArrayDestroy(tbUidList); - } - tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId); - taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); } -end: tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); return code; } + +int32_t tqMetaGetHandle(STQ* pTq, const char* key) { + void* pVal = NULL; + int vLen = 0; + + if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &pVal, &vLen) < 0) { + return -1; + } + STqHandle handle = {0}; + int code = restoreHandle(pTq, pVal, vLen, &handle); + if (code < 0){ + tqDestroyTqHandle(&handle); + } + tdbFree(pVal); + return code; +} diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 331a2fa7ab..79a0e8563c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -26,8 +26,8 @@ #include "executil.h" #include "executorInt.h" #include "querytask.h" -#include "tcompression.h" #include "storageapi.h" +#include "tcompression.h" typedef struct tagFilterAssist { SHashObj* colHash; @@ -42,13 +42,13 @@ typedef enum { } FilterCondType; static FilterCondType checkTagCond(SNode* cond); -static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI); -static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI); +static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI); +static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI); -static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, - SNode* pTagIndexCond, STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI); -static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, - void* pVnode, SStorageAPI* pStorageAPI); +static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, + STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI); +static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode, + SStorageAPI* pStorageAPI); static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; } static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; } @@ -302,7 +302,7 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) { return DEAL_RES_CONTINUE; } -int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI *pAPI) { +int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI* pAPI) { int32_t code = TSDB_CODE_SUCCESS; SMetaReader mr = {0}; @@ -495,7 +495,8 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf genTbGroupDigest((SNode*)listNode, digest, &context); nodesFree(listNode); - pAPI->metaFn.metaGetCachedTbGroup(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), &tableList); + pAPI->metaFn.metaGetCachedTbGroup(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), + &tableList); if (tableList) { taosArrayDestroy(pTableListInfo->pTableList); pTableListInfo->pTableList = tableList; @@ -632,7 +633,8 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf if (tsTagFilterCache) { tableList = taosArrayDup(pTableListInfo->pTableList, NULL); - pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), tableList, taosArrayGetSize(tableList) * sizeof(STableKeyInfo)); + pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), + tableList, taosArrayGetSize(tableList) * sizeof(STableKeyInfo)); } // int64_t st2 = taosGetTimestampUs(); @@ -776,7 +778,8 @@ static int32_t optimizeTbnameInCond(void* pVnode, int64_t suid, SArray* list, SN } // only return uid that does not contained in pExistedUidList -static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, SNode* pTagCond, SStorageAPI* pStoreAPI) { +static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, SNode* pTagCond, + SStorageAPI* pStoreAPI) { if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) { return -1; } @@ -839,8 +842,8 @@ static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, S return -1; } -static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, - void* pVnode, SStorageAPI* pStorageAPI) { +static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode, + SStorageAPI* pStorageAPI) { SSDataBlock* pResBlock = createDataBlock(); if (pResBlock == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1080,8 +1083,8 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S genTagFilterDigest(pTagCond, &context); bool acquired = false; - pStorageAPI->metaFn.getCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest), pUidList, - &acquired); + pStorageAPI->metaFn.getCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest), + pUidList, &acquired); if (acquired) { digest[0] = 1; memcpy(digest + 1, context.digest, tListLen(context.digest)); @@ -1097,13 +1100,15 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S if (pTagIndexCond) { void* pIndex = pStorageAPI->metaFn.getInvertIndex(pVnode); - SIndexMetaArg metaArg = { - .metaEx = pVnode, .idx = pStorageAPI->metaFn.storeGetIndexInfo(pVnode), .ivtIdx = pIndex, .suid = pScanNode->uid}; + SIndexMetaArg metaArg = {.metaEx = pVnode, + .idx = pStorageAPI->metaFn.storeGetIndexInfo(pVnode), + .ivtIdx = pIndex, + .suid = pScanNode->uid}; status = SFLT_NOT_INDEX; code = doFilterTag(pTagIndexCond, &metaArg, pUidList, &status, &pStorageAPI->metaFilter); if (code != 0 || status == SFLT_NOT_INDEX) { // temporarily disable it for performance sake - qWarn("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid); + qDebug("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid); code = TSDB_CODE_SUCCESS; } else { qInfo("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList)); @@ -1128,7 +1133,8 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S memcpy(pPayload + sizeof(int32_t), taosArrayGet(pUidList, 0), numOfTables * sizeof(uint64_t)); } -// metaUidFilterCachePut(metaHandle, pScanNode->suid, context.digest, tListLen(context.digest), pPayload, size, 1); + // metaUidFilterCachePut(metaHandle, pScanNode->suid, context.digest, tListLen(context.digest), pPayload, + // size, 1); digest[0] = 1; memcpy(digest + 1, context.digest, tListLen(context.digest)); } @@ -1152,15 +1158,17 @@ _end: return code; } -int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList, void* pTaskInfo){ - SSubplan *pSubplan = (SSubplan *)node; +int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList, void* pTaskInfo) { + SSubplan* pSubplan = (SSubplan*)node; SScanPhysiNode pNode = {0}; pNode.suid = suid; pNode.uid = suid; pNode.tableType = TSDB_SUPER_TABLE; STableListInfo* pTableListInfo = tableListCreate(); - uint8_t digest[17] = {0}; - int code = getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL, pTableListInfo, digest, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI); + uint8_t digest[17] = {0}; + int code = + getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL, + pTableListInfo, digest, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI); *tableList = pTableListInfo->pTableList; pTableListInfo->pTableList = NULL; tableListDestroy(pTableListInfo); @@ -1181,7 +1189,7 @@ size_t getTableTagsBufLen(const SNodeList* pGroups) { } int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId, - SStorageAPI* pAPI) { + SStorageAPI* pAPI) { SMetaReader mr = {0}; pAPI->metaReaderFn.initReader(&mr, pVnode, 0, &pAPI->metaFn); @@ -1560,7 +1568,8 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu return TSDB_CODE_SUCCESS; } -SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset, SFunctionStateStore* pStore) { +SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset, + SFunctionStateStore* pStore) { SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx)); if (pFuncCtx == NULL) { return NULL; @@ -1849,7 +1858,7 @@ void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t orde } struct tm tm; - time_t t = (time_t) key; + time_t t = (time_t)key; taosLocalTime(&t, &tm, NULL); int mon = (int)(tm.tm_year * 12 + tm.tm_mon + duration * factor); @@ -2079,8 +2088,8 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) { return TSDB_CODE_SUCCESS; } -int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode, SNodeList* group, - bool groupSort, uint8_t *digest, SStorageAPI* pAPI) { +int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode, + SNodeList* group, bool groupSort, uint8_t* digest, SStorageAPI* pAPI) { int32_t code = TSDB_CODE_SUCCESS; bool groupByTbname = groupbyTbname(group); @@ -2132,7 +2141,8 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags } uint8_t digest[17] = {0}; - int32_t code = getTableList(pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo, digest, idStr, &pTaskInfo->storageAPI); + int32_t code = getTableList(pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo, digest, idStr, + &pTaskInfo->storageAPI); if (code != TSDB_CODE_SUCCESS) { qError("failed to getTableList, code: %s", tstrerror(code)); return code; @@ -2150,7 +2160,8 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags return TSDB_CODE_SUCCESS; } - code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pScanNode, pGroupTags, groupSort, digest, &pTaskInfo->storageAPI); + code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pScanNode, pGroupTags, groupSort, digest, + &pTaskInfo->storageAPI); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/function/inc/functionMgtInt.h b/source/libs/function/inc/functionMgtInt.h index 2a8f60d4d2..6d23f65cf3 100644 --- a/source/libs/function/inc/functionMgtInt.h +++ b/source/libs/function/inc/functionMgtInt.h @@ -51,6 +51,7 @@ extern "C" { #define FUNC_MGT_CUMULATIVE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(22) #define FUNC_MGT_INTERP_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(23) #define FUNC_MGT_GEOMETRY_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(24) +#define FUNC_MGT_FORBID_SYSTABLE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(25) #define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 657b02c205..6eb2be34b3 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2348,7 +2348,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "leastsquares", .type = FUNCTION_TYPE_LEASTSQUARES, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateLeastSQR, .getEnvFunc = getLeastSQRFuncEnv, .initFunc = leastSQRFunctionSetup, @@ -2456,7 +2456,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "top", .type = FUNCTION_TYPE_TOP, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_FILL_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | + FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_FILL_FUNC, .translateFunc = translateTopBot, .getEnvFunc = getTopBotFuncEnv, .initFunc = topBotFunctionSetup, @@ -2471,7 +2472,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "bottom", .type = FUNCTION_TYPE_BOTTOM, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_FILL_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | + FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_FILL_FUNC, .translateFunc = translateTopBot, .getEnvFunc = getTopBotFuncEnv, .initFunc = topBotFunctionSetup, @@ -2528,7 +2530,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "elapsed", .type = FUNCTION_TYPE_ELAPSED, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | + FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED, .dataRequiredFunc = statisDataRequired, .translateFunc = translateElapsed, .getEnvFunc = getElapsedFuncEnv, @@ -2568,7 +2571,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "interp", .type = FUNCTION_TYPE_INTERP, .classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_FORBID_STREAM_FUNC|FUNC_MGT_KEEP_ORDER_FUNC, + FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, .translateFunc = translateInterp, .getEnvFunc = getSelectivityFuncEnv, .initFunc = functionSetup, @@ -2580,7 +2583,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "derivative", .type = FUNCTION_TYPE_DERIVATIVE, .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateDerivative, .getEnvFunc = getDerivativeFuncEnv, .initFunc = derivativeFuncSetup, @@ -2592,7 +2595,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "irate", .type = FUNCTION_TYPE_IRATE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | + FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateIrate, .getEnvFunc = getIrateFuncEnv, .initFunc = irateFuncSetup, @@ -2603,7 +2607,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "last_row", .type = FUNCTION_TYPE_LAST_ROW, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateFirstLast, .dynDataRequiredFunc = lastDynDataReq, .getEnvFunc = getFirstLastFuncEnv, @@ -2618,7 +2623,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_cache_last_row", .type = FUNCTION_TYPE_CACHE_LAST_ROW, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | + FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -2628,7 +2634,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_cache_last", .type = FUNCTION_TYPE_CACHE_LAST, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -2638,7 +2644,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_last_row_partial", .type = FUNCTION_TYPE_LAST_PARTIAL, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | + FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateFirstLastPartial, .dynDataRequiredFunc = lastDynDataReq, .getEnvFunc = getFirstLastFuncEnv, @@ -2649,7 +2656,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_last_row_merge", .type = FUNCTION_TYPE_LAST_MERGE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | + FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateFirstLastMerge, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -2659,7 +2667,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "first", .type = FUNCTION_TYPE_FIRST, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateFirstLast, .dynDataRequiredFunc = firstDynDataReq, .getEnvFunc = getFirstLastFuncEnv, @@ -2674,7 +2683,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_first_partial", .type = FUNCTION_TYPE_FIRST_PARTIAL, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | + FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateFirstLastPartial, .dynDataRequiredFunc = firstDynDataReq, .getEnvFunc = getFirstLastFuncEnv, @@ -2686,7 +2696,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_first_merge", .type = FUNCTION_TYPE_FIRST_MERGE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | + FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateFirstLastMerge, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -2697,7 +2708,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "last", .type = FUNCTION_TYPE_LAST, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateFirstLast, .dynDataRequiredFunc = lastDynDataReq, .getEnvFunc = getFirstLastFuncEnv, @@ -2712,7 +2724,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_last_partial", .type = FUNCTION_TYPE_LAST_PARTIAL, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | + FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateFirstLastPartial, .dynDataRequiredFunc = lastDynDataReq, .getEnvFunc = getFirstLastFuncEnv, @@ -2724,7 +2737,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_last_merge", .type = FUNCTION_TYPE_LAST_MERGE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | + FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateFirstLastMerge, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -2735,7 +2749,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "twa", .type = FUNCTION_TYPE_TWA, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | + FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateInNumOutDou, .dataRequiredFunc = statisDataRequired, .getEnvFunc = getTwaFuncEnv, @@ -2826,7 +2841,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "diff", .type = FUNCTION_TYPE_DIFF, .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC, + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateDiff, .getEnvFunc = getDiffFuncEnv, .initFunc = diffFunctionSetup, @@ -2839,7 +2854,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "statecount", .type = FUNCTION_TYPE_STATE_COUNT, .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_FORBID_STREAM_FUNC, + FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateStateCount, .getEnvFunc = getStateFuncEnv, .initFunc = functionSetup, @@ -2851,7 +2866,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "stateduration", .type = FUNCTION_TYPE_STATE_DURATION, .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_FORBID_STREAM_FUNC, + FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateStateDuration, .getEnvFunc = getStateFuncEnv, .initFunc = functionSetup, @@ -2863,7 +2878,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "csum", .type = FUNCTION_TYPE_CSUM, .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, + FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateCsum, .getEnvFunc = getCsumFuncEnv, .initFunc = functionSetup, @@ -2876,7 +2891,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "mavg", .type = FUNCTION_TYPE_MAVG, .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_FORBID_STREAM_FUNC, + FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, .translateFunc = translateMavg, .getEnvFunc = getMavgFuncEnv, .initFunc = mavgFunctionSetup, @@ -2887,7 +2902,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "sample", .type = FUNCTION_TYPE_SAMPLE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_FILL_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | + FUNC_MGT_FORBID_FILL_FUNC, .translateFunc = translateSample, .getEnvFunc = getSampleFuncEnv, .initFunc = sampleFunctionSetup, diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 18f6e8050b..3afa6e9b54 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -219,6 +219,8 @@ bool fmIsKeepOrderFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, F bool fmIsCumulativeFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_CUMULATIVE_FUNC); } +bool fmIsForbidSysTableFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_SYSTABLE_FUNC); } + bool fmIsInterpFunc(int32_t funcId) { if (funcId < 0 || funcId >= funcMgtBuiltinsNum) { return false; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f5454dba23..23b1e14222 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1660,6 +1660,20 @@ static int32_t translateForbidStreamFunc(STranslateContext* pCxt, SFunctionNode* return TSDB_CODE_SUCCESS; } +static int32_t translateForbidSysTableFunc(STranslateContext* pCxt, SFunctionNode* pFunc) { + if (!fmIsForbidSysTableFunc(pFunc->funcId)) { + return TSDB_CODE_SUCCESS; + } + + SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt; + SNode* pTable = pSelect->pFromTable; + if (NULL != pTable && QUERY_NODE_REAL_TABLE == nodeType(pTable) && + TSDB_SYSTEM_TABLE == ((SRealTableNode*)pTable)->pMeta->tableType) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC, pFunc->functionName); + } + return TSDB_CODE_SUCCESS; +} + static int32_t translateRepeatScanFunc(STranslateContext* pCxt, SFunctionNode* pFunc) { if (!fmIsRepeatScanFunc(pFunc->funcId)) { return TSDB_CODE_SUCCESS; @@ -1891,6 +1905,9 @@ static int32_t translateNormalFunction(STranslateContext* pCxt, SFunctionNode* p if (TSDB_CODE_SUCCESS == code) { code = translateForbidStreamFunc(pCxt, pFunc); } + if (TSDB_CODE_SUCCESS == code) { + code = translateForbidSysTableFunc(pCxt, pFunc); + } if (TSDB_CODE_SUCCESS == code) { code = translateRepeatScanFunc(pCxt, pFunc); } @@ -2648,7 +2665,7 @@ static int32_t replaceTbName(STranslateContext* pCxt, SSelectStmt* pSelect) { SNode** pNode = NULL; SRewriteTbNameContext pRewriteCxt = {0}; pRewriteCxt.pTbName = pTable->table.tableName; - + nodesRewriteExprPostOrder(&pSelect->pWhere, doTranslateTbName, &pRewriteCxt); return pRewriteCxt.errCode; diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index ba8392a48c..719d4630b5 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -170,6 +170,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "%s function is not supported in stream query"; case TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC: return "%s function is not supported in group query"; + case TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC: + return "%s function is not supported in system table query"; case TSDB_CODE_PAR_INVALID_INTERP_CLAUSE: return "Invalid usage of RANGE clause, EVERY clause or FILL clause"; case TSDB_CODE_PAR_NO_VALID_FUNC_IN_WIN: diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index 4b8009347d..8b1d970bde 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -247,8 +247,7 @@ static int32_t adjustPartitionDataRequirement(SPartitionLogicNode* pPart, EDataO return TSDB_CODE_PLAN_INTERNAL_ERROR; } pPart->node.resultDataOrder = requirement; - pPart->node.requireDataOrder = - (requirement >= DATA_ORDER_LEVEL_IN_BLOCK ? DATA_ORDER_LEVEL_GLOBAL : DATA_ORDER_LEVEL_NONE); + pPart->node.requireDataOrder = requirement; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index eb36389f1d..b7169dec53 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -80,27 +80,3 @@ void walRefLastVer(SWal *pWal, SWalRef *pRef) { taosThreadMutexUnlock(&pWal->mutex); wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, ver); } - -SWalRef *walRefCommittedVer(SWal *pWal) { - SWalRef *pRef = walOpenRef(pWal); - if (pRef == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - taosThreadMutexLock(&pWal->mutex); - - int64_t ver = walGetCommittedVer(pWal); - - wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver); - - pRef->refVer = ver; - // bsearch in fileSet - SWalFileInfo tmpInfo; - tmpInfo.firstVer = ver; - SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); - ASSERT(pRet != NULL); - // pRef->refFile = pRet->firstVer; - - taosThreadMutexUnlock(&pWal->mutex); - return pRef; -} diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 6c7c5ddb0d..8906391a9a 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -476,13 +476,13 @@ int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode) { return tlen; } -void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz) { +void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz, int8_t sver) { int32_t sz; buf = taosDecodeFixedI32(buf, &sz); *pArray = taosArrayInit(sz, sizeof(void*)); for (int32_t i = 0; i < sz; i++) { void* data = taosMemoryCalloc(1, dataSz); - buf = decode(buf, data); + buf = decode(buf, data, sver); taosArrayPush(*pArray, &data); } return (void*)buf; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 9abdeac877..e4a00c1fc9 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -554,7 +554,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_COL_JSON, "Only tag can be jso TAOS_DEFINE_ERROR(TSDB_CODE_PAR_VALUE_TOO_LONG, "Value too long for column/tag") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_DELETE_WHERE, "The DELETE statement must have a definite time window range") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG, "The REDISTRIBUTE VGROUP statement only support 1 to 3 dnodes") -TAOS_DEFINE_ERROR(TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC, "Fill now allowed") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC, "Fill not allowed") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_WINDOW_PC, "Invalid windows pc") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_WINDOW_NOT_ALLOWED_FUNC, "Window not allowed") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC, "Stream not allowed") @@ -566,6 +566,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_SMA_INDEX, "Invalid sma index") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_SELECTED_EXPR, "Invalid SELECTed expression") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_GET_META_ERROR, "Fail to get table info") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS, "Not unique table/alias") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC, "System table not allowed") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") //planner diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 23f660030a..654e3508cc 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -449,6 +449,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb2.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb3.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py @@ -812,6 +813,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/concat2.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQueryInterval.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/systable_func.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py diff --git a/tests/system-test/2-query/systable_func.py b/tests/system-test/2-query/systable_func.py new file mode 100644 index 0000000000..77ee02e4b1 --- /dev/null +++ b/tests/system-test/2-query/systable_func.py @@ -0,0 +1,60 @@ +import taos +import sys + +from util.log import * +from util.sql import * +from util.cases import * + + + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + #tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def run(self): + tdSql.prepare() + + tdSql.query(f"select count(`columns`) from `information_schema`.`ins_tables`;") + tdSql.query(f"select sum(`columns`) from `information_schema`.`ins_tables`;") + tdSql.query(f"select min(`columns`) from `information_schema`.`ins_tables`;") + tdSql.query(f"select max(`columns`) from `information_schema`.`ins_tables`;") + tdSql.query(f"select stddev(`columns`) from `information_schema`.`ins_tables`;") + tdSql.query(f"select avg(`columns`) from `information_schema`.`ins_tables`;") + tdSql.query(f"select apercentile(`columns`, 50) from `information_schema`.`ins_tables`;") + tdSql.query(f"select top(`columns`, 3) from `information_schema`.`ins_tables`;") + tdSql.query(f"select bottom(`columns`, 3) from `information_schema`.`ins_tables`;") + tdSql.query(f"select spread(`columns`) from `information_schema`.`ins_tables`;") + tdSql.query(f"select histogram(`columns`, 'user_input', '[1, 3, 5]', 0) from `information_schema`.`ins_tables`;") + tdSql.query(f"select hyperloglog(`columns`) from `information_schema`.`ins_tables`;") + tdSql.query(f"select sample(`columns`, 3) from `information_schema`.`ins_tables`;") + tdSql.query(f"select tail(`columns`, 3) from `information_schema`.`ins_tables`;") + tdSql.query(f"select unique(`columns`) from `information_schema`.`ins_tables`;") + tdSql.query(f"select mode(`columns`) from `information_schema`.`ins_tables`;") + + tdSql.error(f"select leastsquares(`columns`, 1, 1) from `information_schema`.`ins_tables`;") + tdSql.error(f"select elapsed(`columns`) from `information_schema`.`ins_tables`;") + tdSql.error(f"select interp(`columns`) from `information_schema`.`ins_tables` range(0, 1) every(1s) fill(null);") + tdSql.error(f"select percentile(`columns`, 50) from `information_schema`.`ins_tables`;") + tdSql.error(f"select derivative(`columns`, 1s, 0) from `information_schema`.`ins_tables`;") + tdSql.error(f"select irate(`columns`) from `information_schema`.`ins_tables`;") + tdSql.error(f"select last_row(`columns`) from `information_schema`.`ins_tables`;") + tdSql.error(f"select last(`columns`) from `information_schema`.`ins_tables`;") + tdSql.error(f"select first(`columns`) from `information_schema`.`ins_tables`;") + tdSql.error(f"select twa(`columns`) from `information_schema`.`ins_tables`;") + tdSql.error(f"select diff(`columns`) from `information_schema`.`ins_tables`;") + tdSql.error(f"select statecount(`columns`, 'GE', 0) from `information_schema`.`ins_tables`;") + tdSql.error(f"select stateduration(`columns`, 'GE', 0, 1s) from `information_schema`.`ins_tables`;") + tdSql.error(f"select csum(`columns`) from `information_schema`.`ins_tables`;") + tdSql.error(f"select mavg(`columns`, 1) from `information_schema`.`ins_tables`;") + + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())