From 185071fae9175246d40127cede03d26f289e52fa Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 11 Jul 2023 19:34:03 +0800 Subject: [PATCH] enh: add group cache func --- include/libs/nodes/plannodes.h | 1 + include/util/taoserror.h | 1 + source/libs/executor/inc/groupcache.h | 36 +- source/libs/executor/src/groupcacheoperator.c | 351 ++++++++++++++---- source/util/src/terror.c | 1 + 5 files changed, 317 insertions(+), 73 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 0ffe06048c..1935e42f56 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -443,6 +443,7 @@ typedef struct SGroupCachePhysiNode { SPhysiNode node; bool grpColsMayBeNull; SArray* pDownstreamKey; + bool grpByUid; SNodeList* pGroupCols; } SGroupCachePhysiNode; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 0cd73f2d9a..549ea1463e 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -517,6 +517,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_JOB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x072F) #define TSDB_CODE_QRY_QWORKER_QUIT TAOS_DEF_ERROR_CODE(0, 0x0730) #define TSDB_CODE_QRY_GEO_NOT_SUPPORT_ERROR TAOS_DEF_ERROR_CODE(0, 0x0731) +#define TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x0732) // grant #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index c38c083b19..4b3f3d3a3a 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -37,9 +37,10 @@ typedef struct SGcBufPageInfo { typedef struct SGroupCacheData { TdThreadMutex mutex; - SSHashObj* waitQueue; + SArray* waitQueue; bool fetchDone; - int64_t fetchSessionId; + bool needCache; + SSDataBlock* pBlock; SGcBlkBufInfo* pFirstBlk; SGcBlkBufInfo* pLastBlk; } SGroupCacheData; @@ -60,11 +61,26 @@ typedef struct SGroupColsInfo { char* pData; } SGroupColsInfo; +typedef struct SGcNewGroupInfo { + int64_t uid; + SOperatorParam* pParam; +} SGcNewGroupInfo; + +typedef struct SGcDownstreamCtx { + SRWLatch lock; + int64_t fetchSessionId; + SArray* pNewGrpList; // SArray + SArray* pGrpUidList; +} SGcDownstreamCtx; + typedef struct SGcSessionCtx { - int32_t downstreamIdx; - bool needCache; - SGroupCacheData* pGroupData; - SGcBlkBufInfo* pLastBlk; + int32_t downstreamIdx; + bool needCache; + SGcOperatorParam* pParam; + SGroupCacheData* pGroupData; + SGcBlkBufInfo* pLastBlk; + bool semInit; + tsem_t waitSem; } SGcSessionCtx; typedef struct SGcExecInfo { @@ -72,9 +88,17 @@ typedef struct SGcExecInfo { int64_t* pDownstreamBlkNum; } SGcExecInfo; +typedef struct SGcNewGroupInfo { + int64_t uid; +} SGcNewGroupInfo; + typedef struct SGroupCacheOperatorInfo { + TdThreadMutex sessionMutex; + SGcNewGroupInfo newGroup; SSHashObj* pSessionHash; SGroupColsInfo groupColsInfo; + bool grpByUid; + SGcDownstreamCtx* pDownstreams; SArray* pBlkBufs; SHashObj* pBlkHash; SGcExecInfo execInfo; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index c664d2d7fd..ec352562f4 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -110,29 +110,218 @@ static FORCE_INLINE char* retrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBufInfo return pPage->data + pBlkInfo->offset; } -static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) { +static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOperator, int32_t downstreamIdx) { + SGroupCacheOperatorInfo* pGCache = pOperator->info; + SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[downstreamIdx]; + + taosWLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock); + + if (NULL == taosArrayPush(pCtx->pGrpUidList, &pGCache->newGroup.uid)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pGCache->newGroup.uid = 0; + taosThreadMutexUnlock(&pGCache->sessionMutex); + + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, int32_t downstreamIdx, SSDataBlock** ppRes) { int32_t code = TSDB_CODE_SUCCESS; SGroupCacheOperatorInfo* pGCache = pOperator->info; - if (NULL == pSession->pLastBlk) { - if (pSession->pGroupData->pFirstBlk) { - *ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pGroupData->pFirstBlk); - pSession->pLastBlk = pSession->pGroupData->pFirstBlk; - return TSDB_CODE_SUCCESS; - } - - if (atomic_load_64(&pSession->pGroupData->fetchSessionId) == sessionId) { - getBlkFromDownstreamOperator(pOperator, pSession->downstreamIdx, ppRes); + if (pGCache->pDownstreams[downstreamIdx].pNewGrpList) { + code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pGCache->newGroup.uid); + if (code) { + return code; } } - SGcBlkBufInfo* pCurr = (*ppLastBlk)->next; - *ppLastBlk = pCurr; - if (pCurr) { - SGcBufPageInfo *pPage = taosArrayGet(pBlkBufs, pCurr->pageId); - return pPage->data + pCurr->offset; + SSDataBlock* pBlock = getNextBlockFromDownstreamOnce(pOperator, downstreamIdx); + if (pBlock) { + pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++; + } + + *ppRes = pBlock; + + return TSDB_CODE_SUCCESS; +} + +static void addBlkToGroupCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SSDataBlock** ppRes) { + *ppRes = pBlock; +} + +static void notifyWaitingSessions(SArray* pWaitQueue) { + if (NULL == pWaitQueue || taosArrayGetSize(pWaitQueue) <= 0) { + return; + } + + int32_t n = taosArrayGetSize(pWaitQueue); + for (int32_t i = 0; i < n; ++i) { + SGcSessionCtx* pSession = taosArrayGetP(pWaitQueue, i); + tsem_post(&pSession->waitSem); + } +} + +int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcSessionCtx* pSession, bool* continueFetch) { + int32_t code = TSDB_CODE_SUCCESS; + SGroupCacheOperatorInfo* pGCache = pOperator->info; + if (pGCache->grpByUid) { + SGroupCacheData* pGroup = taosHashGet(pGCache->pBlkHash, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); + pGroup->pBlock = pBlock; + + if (pGroup->needCache) { + SGcBlkBufInfo* pNewBlk = NULL; + code = addBlkToGroupCache(pOperator, pBlock, &pNewBlk); + if (code) { + return code; + } + + if (pGroup->pLastBlk) { + pGroup->pLastBlk->next = pNewBlk; + pGroup->pLastBlk = pNewBlk; + } else { + pGroup->pFirstBlk = pNewBlk; + pGroup->pLastBlk = pNewBlk; + } + } + + notifyWaitingSessions(pGroup->waitQueue); + if (pGroup == pSession->pGroupData) { + *continueFetch = false; + } + + return TSDB_CODE_SUCCESS; } - return NULL; + return TSDB_CODE_INVALID_PARA; +} + +static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSessionCtx* pSession) { + notifyWaitingSessions(); +} + +static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, SGcSessionCtx* pSession, SSDataBlock** ppRes) { + bool continueFetch = true; + int32_t code = TSDB_CODE_SUCCESS; + + while (continueFetch && TSDB_CODE_SUCCESS == code) { + int32_t code = getBlkFromDownstreamOperator(pOperator, pSession->downstreamIdx, ppRes); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + if (NULL == *ppRes) { + code = handleDownstreamFetchDone(pOperator, pSession); + break; + } else { + code = handleGroupCacheRetrievedBlk(pOperator, *ppRes, pSession, &continueFetch); + } + } + + return code; +} + +static int32_t groupCacheSessionWait(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGroup, SGcSessionCtx* pSession, SSDataBlock** ppRes) { + if (NULL == pGroup->waitQueue) { + pGroup->waitQueue = taosArrayInit(1, POINTER_BYTES); + if (NULL == pGroup->waitQueue) { + taosThreadMutexUnlock(&pSession->pGroupData->mutex); + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + taosArrayPush(pGroup->waitQueue, &pSession); + + if (!pSession->semInit) { + tsem_init(&pSession->waitSem, 0, 0); + pSession->semInit = true; + } + + taosThreadMutexUnlock(&pSession->pGroupData->mutex); + + tsem_wait(&pSession->waitSem); + + if (pSession->pGroupData->needCache) { + if (NULL == pSession->pLastBlk) { + if (pSession->pGroupData->pFirstBlk) { + *ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pGroupData->pFirstBlk); + pSession->pLastBlk = pSession->pGroupData->pFirstBlk; + return TSDB_CODE_SUCCESS; + } else if (pGroup->fetchDone) { + *ppRes = NULL; + return TSDB_CODE_SUCCESS; + } + } else if (pSession->pLastBlk->next) { + *ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pLastBlk->next); + pSession->pLastBlk = pSession->pLastBlk->next; + return TSDB_CODE_SUCCESS; + } + } else { + *ppRes = pSession->pGroupData->pBlock; + return TSDB_CODE_SUCCESS; + } + + qError("no block retrieved from downstream and waked up"); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; +} + +static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + SGroupCacheOperatorInfo* pGCache = pOperator->info; + bool locked = false; + + while (true) { + if (pSession->pGroupData->needCache) { + if (NULL == pSession->pLastBlk) { + if (pSession->pGroupData->pFirstBlk) { + *ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pGroupData->pFirstBlk); + pSession->pLastBlk = pSession->pGroupData->pFirstBlk; + goto _return; + } + } else if (pSession->pLastBlk->next) { + *ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pLastBlk->next); + pSession->pLastBlk = pSession->pLastBlk->next; + goto _return; + } + } else if (pSession->pGroupData->pBlock || pSession->pGroupData->fetchDone) { + *ppRes = pSession->pGroupData->pBlock; + pSession->pGroupData->pBlock = NULL; + goto _return; + } + + if ((atomic_load_64(&pGCache->pDownstreams[pSession->downstreamIdx].fetchSessionId) == sessionId) + || (-1 == atomic_val_compare_exchange_64(&pGCache->pDownstreams[pSession->downstreamIdx].fetchSessionId, -1, sessionId))) { + if (locked) { + taosThreadMutexUnlock(&pSession->pGroupData->mutex); + locked = false; + } + + code = getCacheBlkFromDownstreamOperator(pOperator, pSession, ppRes); + goto _return; + } + + if (locked) { + code = groupCacheSessionWait(pGCache, pSession->pGroupData, pSession, ppRes); + locked = false; + if (TSDB_CODE_SUCCESS != code) { + goto _return; + } + + break; + } + + taosThreadMutexLock(&pSession->pGroupData->mutex); + locked = true; + }; + + +_return: + + if (locked) { + taosThreadMutexUnlock(&pSession->pGroupData->mutex); + } + + return code; } @@ -145,9 +334,11 @@ static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) { return addPageToGroupCacheBuf(pInfo->pBlkBufs); } -static int32_t initGroupCacheGroupData(SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SGroupCacheData** ppGrp) { +static int32_t initGroupCacheGroupData(struct SOperatorInfo* pOperator, SGcOperatorParam* pParam, SGroupCacheData** ppGrp) { + SGroupCacheOperatorInfo* pGCache = pOperator->info; SGroupCacheData grpData = {0}; - grpData.fetchSessionId = pParam->sessionId; + grpData.needCache = pParam->needCache; + while (true) { if (0 != taosHashPut(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize, &grpData, sizeof(grpData))) { if (terrno == TSDB_CODE_DUP_KEY) { @@ -162,6 +353,16 @@ static int32_t initGroupCacheGroupData(SGroupCacheOperatorInfo* pGCache, SGcOper *ppGrp = taosHashAcquire(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize); if (*ppGrp) { + SGcNewGroupInfo newGroup; + newGroup.uid = *(int64_t*)pParam->pGroupValue; + newGroup.pParam = pOperator->pDownstreamParams[pParam->downstreamIdx]; + taosWLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock); + if (NULL == taosArrayPush(pGCache->pDownstreams[pParam->downstreamIdx].pNewGrpList, &newGroup)) { + taosWUnLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock); + return TSDB_CODE_OUT_OF_MEMORY; + } + taosWUnLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock); + break; } } @@ -177,14 +378,15 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperato if (pGroup) { ctx.pGroupData = pGroup; } else { - code = initGroupCacheGroupData(pGCache, pParam, &ctx.pGroupData); + code = initGroupCacheGroupData(pOperator, pParam, &ctx.pGroupData); if (TSDB_CODE_SUCCESS != code) { return code; } } + + taosThreadMutexUnlock(&pGCache->sessionMutex); - ctx.downstreamIdx = pParam->downstreamIdx; - ctx.needCache = pParam->needCache; + ctx.pParam = pParam; int32_t code = tSimpleHashPut(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId), &ctx, sizeof(ctx)); if (TSDB_CODE_SUCCESS != code) { @@ -196,19 +398,20 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperato return TSDB_CODE_SUCCESS; } -static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes, SGcSessionCtx** ppSession) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SGcSessionCtx* pCtx = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId)); - if (NULL == pCtx) { - int32_t code = initGroupCacheSession(pOperator, pParam, ppSession); +static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, SSDataBlock** ppRes) { + SGroupCacheOperatorInfo* pGCache = pOperator->info; + SGcOperatorParam* pGcParam = pOperator->pOperatorParam->value; + SGcSessionCtx* pSession = tSimpleHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); + if (NULL == pSession) { + int32_t code = initGroupCacheSession(pOperator, pGcParam, &pSession); if (TSDB_CODE_SUCCESS != code) { return code; } } else { - *ppSession = pCtx; + taosThreadMutexUnlock(&pGCache->sessionMutex); } - return getBlkFromSessionCacheImpl(pOperator, pParam->sessionId, *ppSession, ppRes); + return getBlkFromSessionCacheImpl(pOperator, pGcParam->sessionId, pSession, ppRes); } static FORCE_INLINE void destroyCurrentGroupCacheSession(SGroupCacheOperatorInfo* pGCache, SGcSessionCtx** ppCurrent, int64_t* pCurrentId) { @@ -223,48 +426,16 @@ static FORCE_INLINE void destroyCurrentGroupCacheSession(SGroupCacheOperatorInfo *pCurrentId = 0; } -static void setCurrentGroupCacheDone(struct SOperatorInfo* pOperator) { - SGroupCacheOperatorInfo* pGCache = pOperator->info; - //destroyCurrentGroupCacheSession(pGCache, &pGCache->pCurrent, &pGCache->pCurrentId); -} - -static void addBlkToGroupCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SSDataBlock** ppRes) { - *ppRes = pBlock; -} - -SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) { - SGroupCacheOperatorInfo* pGCache = pOperator->info; +SSDataBlock* getBlkFromGroupCache(struct SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SGcOperatorParam* pParam = (SGcOperatorParam*)pOperator->pOperatorParam->value; - SGcSessionCtx* pSession = NULL; SSDataBlock* pRes = NULL; - int32_t code = getBlkFromSessionCache(pOperator, pGCache, pParam, &pRes, &pSession); + + int32_t code = getBlkFromSessionCache(pOperator, &pRes); if (TSDB_CODE_SUCCESS != code) { pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } - if (pRes) { - return pRes; - } - - while (true) { - SSDataBlock* pBlock = getNextBlockFromDownstreamOnce(pOperator, pSession->downstreamIdx); - if (NULL == pBlock) { - setCurrentGroupCacheDone(pOperator); - break; - } - - pGCache->execInfo.pDownstreamBlkNum[pSession->downstreamIdx]++; - - if (pSession->needCache) { - addBlkToGroupCache(pOperator, pBlock, &pRes); - } else { - pRes = pBlock; - } - break; - } - return pRes; } @@ -278,6 +449,39 @@ static int32_t initGroupCacheExecInfo(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } +static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { + SGroupCacheOperatorInfo* pInfo = pOperator->info; + pInfo->pDownstreams = taosMemoryMalloc(pOperator->numOfDownstream * sizeof(*pInfo->pDownstreams)); + if (NULL == pInfo->pDownstreams) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { + pInfo->pDownstreams[i].fetchSessionId = -1; + pInfo->pDownstreams[i].pGrpUidList = taosArrayInit(10, sizeof(int64_t)); + if (NULL == pInfo->pDownstreams[i].pGrpUidList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + return TSDB_CODE_SUCCESS; +} + +SSDataBlock* groupCacheGetNext(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { + SGroupCacheOperatorInfo* pGCache = pOperator->info; + + taosThreadMutexLock(&pGCache->sessionMutex); + + int32_t code = setOperatorParams(pOperator, pParam); + if (TSDB_CODE_SUCCESS != code) { + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); + } + + return getBlkFromGroupCache(pOperator); +} + + SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) { SGroupCacheOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupCacheOperatorInfo)); @@ -293,9 +497,17 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo); - code = initGroupColsInfo(&pInfo->groupColsInfo, pPhyciNode->grpColsMayBeNull, pPhyciNode->pGroupCols); - if (code) { - goto _error; + pInfo->grpByUid = pPhyciNode->grpByUid; + if (!pInfo->grpByUid) { + qError("only group cache by uid is supported now"); + return TSDB_CODE_INVALID_PARA; + } + + if (pPhyciNode->pGroupCols) { + code = initGroupColsInfo(&pInfo->groupColsInfo, pPhyciNode->grpColsMayBeNull, pPhyciNode->pGroupCols); + if (code) { + goto _error; + } } code = initGroupCacheBufPages(pInfo); @@ -320,12 +532,17 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t goto _error; } + code = initGroupCacheDownstreamCtx(pOperator); + if (TSDB_CODE_SUCCESS != code) { + goto _error; + } + code = initGroupCacheExecInfo(pOperator); if (TSDB_CODE_SUCCESS != code) { goto _error; } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getFromGroupCache, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getBlkFromGroupCache, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, groupCacheGetNext, NULL); return pOperator; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 7d3859e04a..51d862e99b 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -406,6 +406,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_GROUP_ERROR, "Json not support in g TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_NOT_EXIST, "Job not exist") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_QWORKER_QUIT, "Vnode/Qnode is quitting") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_GEO_NOT_SUPPORT_ERROR, "Geometry not support in this operator") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, "Executor internal error") // grant TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")