diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 59bba315f4..52d8a75ee0 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -415,6 +415,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_NO_AVAIL_BUFPOOL TAOS_DEF_ERROR_CODE(0, 0x0528) #define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529) #define TSDB_CODE_VND_DUP_REQUEST TAOS_DEF_ERROR_CODE(0, 0x0530) +#define TSDB_CODE_VND_QUERY_BUSY TAOS_DEF_ERROR_CODE(0, 0x0531) // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a58301adf2..8c8775548d 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -65,7 +65,6 @@ typedef struct SSmaInfo SSmaInfo; typedef struct SBlockCol SBlockCol; typedef struct SVersionRange SVersionRange; typedef struct SLDataIter SLDataIter; -typedef struct SQueryNode SQueryNode; typedef struct SDiskCol SDiskCol; typedef struct SDiskData SDiskData; typedef struct SDiskDataBuilder SDiskDataBuilder; @@ -209,13 +208,11 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in uint8_t **ppBuf); // tsdbMemTable ============================================================================================== // SMemTable -typedef int32_t (*_tsdb_reseek_func_t)(void *pQHandle); - int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); -void tsdbMemTableDestroy(SMemTable *pMemTable); +void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); -int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode); -int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode); +int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode); +int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); // STbDataIter int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); @@ -293,8 +290,8 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader); int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); // tsdbRead.c ============================================================================================== -int32_t tsdbTakeReadSnap(STsdbReader *pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap **ppSnap); -void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap); +int32_t tsdbTakeReadSnap(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap); +void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive); // tsdbMerge.c ============================================================================================== int32_t tsdbMerge(STsdb *pTsdb); @@ -369,13 +366,6 @@ struct STbData { STbData *next; }; -struct SQueryNode { - SQueryNode *pNext; - SQueryNode **ppNext; - void *pQHandle; - _tsdb_reseek_func_t reseek; -}; - struct SMemTable { SRWLatch latch; STsdb *pTsdb; @@ -392,7 +382,6 @@ struct SMemTable { int32_t nBucket; STbData **aBucket; }; - SQueryNode qList; }; struct TSDBROW { diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index e075d8a6ca..e0d0e9408b 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -61,10 +61,19 @@ struct SVBufPoolNode { }; struct SVBufPool { - SVBufPool* next; + SVBufPool* freeNext; + SVBufPool* recycleNext; + SVBufPool* recyclePrev; + + // query handle list + TdThreadMutex mutex; + int32_t nQuery; + SQueryNode qList; + SVnode* pVnode; - TdThreadSpinlock* lock; + int32_t id; volatile int32_t nRef; + TdThreadSpinlock* lock; int64_t size; uint8_t* ptr; SVBufPoolNode* pTail; @@ -74,6 +83,8 @@ struct SVBufPool { int32_t vnodeOpenBufPool(SVnode* pVnode); int32_t vnodeCloseBufPool(SVnode* pVnode); void vnodeBufPoolReset(SVBufPool* pPool); +void vnodeBufPoolAddToFreeList(SVBufPool* pPool); +int32_t vnodeBufPoolRecycle(SVBufPool* pPool); // vnodeQuery.c int32_t vnodeQueryOpen(SVnode* pVnode); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 81873a38bc..9c5f47b731 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -76,6 +76,7 @@ typedef struct SRSmaSnapReader SRSmaSnapReader; typedef struct SRSmaSnapWriter SRSmaSnapWriter; typedef struct SSnapDataHdr SSnapDataHdr; typedef struct SCommitInfo SCommitInfo; +typedef struct SQueryNode SQueryNode; #define VNODE_META_DIR "meta" #define VNODE_TSDB_DIR "tsdb" @@ -87,17 +88,29 @@ typedef struct SCommitInfo SCommitInfo; #define VNODE_RSMA1_DIR "rsma1" #define VNODE_RSMA2_DIR "rsma2" +#define VNODE_BUFPOOL_SEGMENTS 3 + #define VND_INFO_FNAME "vnode.json" // vnd.h +typedef int32_t (*_query_reseek_func_t)(void* pQHandle); +struct SQueryNode { + SQueryNode* pNext; + SQueryNode** ppNext; + void* pQHandle; + _query_reseek_func_t reseek; +}; void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); void* vnodeBufPoolMallocAligned(SVBufPool* pPool, int size); void vnodeBufPoolFree(SVBufPool* pPool, void* p); void vnodeBufPoolRef(SVBufPool* pPool); -void vnodeBufPoolUnRef(SVBufPool* pPool); +void vnodeBufPoolUnRef(SVBufPool* pPool, bool proactive); int vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo); +int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, SQueryNode* pQNode); +void vnodeBufPoolDeregisterQuery(SVBufPool* pPool, SQueryNode* pQNode, bool proactive); + // meta typedef struct SMCtbCursor SMCtbCursor; typedef struct SMStbCursor SMStbCursor; @@ -331,16 +344,24 @@ typedef struct SVCommitSched { } SVCommitSched; struct SVnode { - char* path; - SVnodeCfg config; - SVState state; - SVStatis statis; - STfs* pTfs; - SMsgCb msgCb; + char* path; + SVnodeCfg config; + SVState state; + SVStatis statis; + STfs* pTfs; + SMsgCb msgCb; + + // Buffer Pool TdThreadMutex mutex; TdThreadCond poolNotEmpty; - SVBufPool* pPool; + SVBufPool* aBufPool[VNODE_BUFPOOL_SEGMENTS]; + SVBufPool* freeList; SVBufPool* inUse; + SVBufPool* onCommit; + SVBufPool* recycleHead; + SVBufPool* recycleTail; + SVBufPool* onRecycle; + SMeta* pMeta; SSma* pSma; STsdb* pTsdb; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 1c1626ba5c..6a5acecfc3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -208,13 +208,21 @@ static int32_t tsdbCacheQueryReseek(void* pQHandle) { int32_t code = 0; SCacheRowsReader* pReader = pQHandle; - taosThreadMutexLock(&pReader->readerMutex); + code = taosThreadMutexTryLock(&pReader->readerMutex); + if (code == 0) { + // pause current reader's state if not paused, save ts & version for resuming + // just wait for the big all tables' snapshot untaking for now - // pause current reader's state if not paused, save ts & version for resuming - // just wait for the big all tables' snapshot untaking for now + code = TSDB_CODE_VND_QUERY_BUSY; - taosThreadMutexUnlock(&pReader->readerMutex); - return code; + taosThreadMutexUnlock(&pReader->readerMutex); + + return code; + } else if (code == EBUSY) { + return TSDB_CODE_VND_QUERY_BUSY; + } else { + return -1; + } } int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) { @@ -375,7 +383,7 @@ _end: tsdbDataFReaderClose(&pr->pDataFReader); resetLastBlockLoadInfo(pr->pLoadInfo); - tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap); + tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap, true); taosThreadMutexUnlock(&pr->readerMutex); for (int32_t j = 0; j < pr->numOfCols; ++j) { diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 31d6fd85c6..d15f848cfd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -175,7 +175,7 @@ int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) { pTsdb->imem = NULL; taosThreadRwlockUnlock(&pTsdb->rwLock); - tsdbUnrefMemTable(pMemTable, NULL); + tsdbUnrefMemTable(pMemTable, NULL, true); goto _exit; } @@ -1664,7 +1664,7 @@ int32_t tsdbFinishCommit(STsdb *pTsdb) { // unlock taosThreadRwlockUnlock(&pTsdb->rwLock); if (pMemTable) { - tsdbUnrefMemTable(pMemTable, NULL); + tsdbUnrefMemTable(pMemTable, NULL, true); } _exit: diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 6bbaa31ed3..d68afbed1f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -64,8 +64,8 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { taosMemoryFree(pMemTable); goto _err; } - pMemTable->qList.pNext = &pMemTable->qList; - pMemTable->qList.ppNext = &pMemTable->qList.pNext; + // pMemTable->qList.pNext = &pMemTable->qList; + // pMemTable->qList.ppNext = &pMemTable->qList.pNext; vnodeBufPoolRef(pMemTable->pPool); *ppMemTable = pMemTable; @@ -76,9 +76,9 @@ _err: return code; } -void tsdbMemTableDestroy(SMemTable *pMemTable) { +void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive) { if (pMemTable) { - vnodeBufPoolUnRef(pMemTable->pPool); + vnodeBufPoolUnRef(pMemTable->pPool, proactive); taosMemoryFree(pMemTable->aBucket); taosMemoryFree(pMemTable); } @@ -749,42 +749,27 @@ _exit: int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } -int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode) { +int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode) { int32_t code = 0; int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); ASSERT(nRef > 0); - /* - // register handle (todo: take concurrency in consideration) - *ppNode = taosMemoryMalloc(sizeof(SQueryNode)); - if (*ppNode == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - (*ppNode)->pQHandle = pQHandle; - (*ppNode)->reseek = reseek; - (*ppNode)->pNext = pMemTable->qList.pNext; - (*ppNode)->ppNext = &pMemTable->qList.pNext; - pMemTable->qList.pNext->ppNext = &(*ppNode)->pNext; - pMemTable->qList.pNext = *ppNode; - */ + + vnodeBufPoolRegisterQuery(pMemTable->pPool, pQNode); + _exit: return code; } -int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode) { +int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive) { int32_t code = 0; - /* - // unregister handle (todo: take concurrency in consideration) + if (pNode) { - pNode->pNext->ppNext = pNode->ppNext; - *pNode->ppNext = pNode->pNext; - taosMemoryFree(pNode); + vnodeBufPoolDeregisterQuery(pMemTable->pPool, pNode, proactive); } - */ - int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); - if (nRef == 0) { - tsdbMemTableDestroy(pMemTable); + + if (atomic_sub_fetch_32(&pMemTable->nRef, 1) == 0) { + tsdbMemTableDestroy(pMemTable, proactive); } return code; @@ -828,28 +813,28 @@ _exit: return aTbDataP; } -int32_t tsdbRecycleMemTable(SMemTable *pMemTable) { - int32_t code = 0; +// int32_t tsdbRecycleMemTable(SMemTable *pMemTable) { +// int32_t code = 0; - SQueryNode *pNode = pMemTable->qList.pNext; - while (1) { - ASSERT(pNode != &pMemTable->qList); - SQueryNode *pNextNode = pNode->pNext; +// SQueryNode *pNode = pMemTable->qList.pNext; +// while (1) { +// ASSERT(pNode != &pMemTable->qList); +// SQueryNode *pNextNode = pNode->pNext; - if (pNextNode == &pMemTable->qList) { - code = (*pNode->reseek)(pNode->pQHandle); - if (code) goto _exit; - break; - } else { - code = (*pNode->reseek)(pNode->pQHandle); - if (code) goto _exit; - pNode = pMemTable->qList.pNext; - ASSERT(pNode == pNextNode); - } - } +// if (pNextNode == &pMemTable->qList) { +// code = (*pNode->reseek)(pNode->pQHandle); +// if (code) goto _exit; +// break; +// } else { +// code = (*pNode->reseek)(pNode->pQHandle); +// if (code) goto _exit; +// pNode = pMemTable->qList.pNext; +// ASSERT(pNode == pNextNode); +// } +// } - // NOTE: Take care here, pMemTable is destroyed +// // NOTE: Take care here, pMemTable is destroyed -_exit: - return code; -} +// _exit: +// return code; +// } diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index f71b5b6706..8901f64459 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -88,7 +88,7 @@ _err: int tsdbClose(STsdb **pTsdb) { if (*pTsdb) { taosThreadRwlockWrlock(&(*pTsdb)->rwLock); - tsdbMemTableDestroy((*pTsdb)->mem); + tsdbMemTableDestroy((*pTsdb)->mem, true); (*pTsdb)->mem = NULL; taosThreadRwlockUnlock(&(*pTsdb)->rwLock); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index e658d10b3f..5ba44decb4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3953,7 +3953,7 @@ void tsdbReaderClose(STsdbReader* pReader) { } qTrace("tsdb/reader: %p, untake snapshot", pReader); - tsdbUntakeReadSnap(pReader, pReader->pReadSnap); + tsdbUntakeReadSnap(pReader, pReader->pReadSnap, true); taosThreadMutexDestroy(&pReader->readerMutex); @@ -4001,7 +4001,8 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) { if (pStatus->loadFromFile) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); if (pBlockInfo != NULL) { - pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + pBlockScanInfo = + *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); if (pBlockScanInfo == NULL) { code = TSDB_CODE_INVALID_PARA; tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, @@ -4015,20 +4016,28 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) { tsdbDataFReaderClose(&pReader->pFileReader); // resetDataBlockScanInfo excluding lastKey - STableBlockScanInfo* p = NULL; + STableBlockScanInfo** p = NULL; while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) { - p->iterInit = false; - p->iiter.hasVal = false; - if (p->iter.iter != NULL) { - p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); + STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; + + pInfo->iterInit = false; + pInfo->iter.hasVal = false; + pInfo->iiter.hasVal = false; + + if (pInfo->iter.iter != NULL) { + pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter); } - p->delSkyline = taosArrayDestroy(p->delSkyline); - // p->lastKey = ts; + if (pInfo->iiter.iter != NULL) { + pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter); + } + + pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); + // pInfo->lastKey = ts; } } else { - pBlockScanInfo = *pStatus->pTableIter; + pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter; if (pBlockScanInfo) { // save lastKey to restore memory iterator STimeWindow w = pReader->pResBlock->info.window; @@ -4052,11 +4061,13 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) { } } - tsdbUntakeReadSnap(pReader, pReader->pReadSnap); + tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false); + pReader->pReadSnap = NULL; pReader->suspended = true; - tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo->uid, pReader->idStr); + tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0, + pReader->idStr); return code; _err: @@ -4068,18 +4079,24 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) { int32_t code = 0; STsdbReader* pReader = pQHandle; - taosThreadMutexLock(&pReader->readerMutex); + code = taosThreadMutexTryLock(&pReader->readerMutex); + if (code == 0) { + if (pReader->suspended) { + taosThreadMutexUnlock(&pReader->readerMutex); + return code; + } + + tsdbReaderSuspend(pReader); - if (pReader->suspended) { taosThreadMutexUnlock(&pReader->readerMutex); + return code; + } else if (code == EBUSY) { + return TSDB_CODE_VND_QUERY_BUSY; + } else { + terrno = TAOS_SYSTEM_ERROR(code); + return TSDB_CODE_FAILED; } - - tsdbReaderSuspend(pReader); - - taosThreadMutexUnlock(&pReader->readerMutex); - - return code; } int32_t tsdbReaderResume(STsdbReader* pReader) { @@ -4428,17 +4445,18 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { SReaderStatus* pStatus = &pReader->status; - qTrace("tsdb/read: %p, take read mutex", pReader); + qTrace("tsdb/reader-reset: %p, take read mutex", pReader); taosThreadMutexLock(&pReader->readerMutex); if (pReader->suspended) { tsdbReaderResume(pReader); } - taosThreadMutexUnlock(&pReader->readerMutex); - if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) { tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap); + + taosThreadMutexUnlock(&pReader->readerMutex); + return TSDB_CODE_SUCCESS; } @@ -4474,6 +4492,9 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { if (code != TSDB_CODE_SUCCESS) { tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr); + + taosThreadMutexUnlock(&pReader->readerMutex); + return code; } } @@ -4483,6 +4504,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey, pReader->idStr); + taosThreadMutexUnlock(&pReader->readerMutex); + return code; } @@ -4639,68 +4662,91 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 return TSDB_CODE_SUCCESS; } -int32_t tsdbTakeReadSnap(STsdbReader* pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap** ppSnap) { +int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsdbReadSnap** ppSnap) { int32_t code = 0; STsdb* pTsdb = pReader->pTsdb; SVersionRange* pRange = &pReader->verRange; // alloc - *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap)); - if (*ppSnap == NULL) { + STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(*pSnap)); + if (pSnap == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } // lock - code = taosThreadRwlockRdlock(&pTsdb->rwLock); - if (code) { - code = TAOS_SYSTEM_ERROR(code); - goto _exit; - } + taosThreadRwlockRdlock(&pTsdb->rwLock); // take snapshot if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) { - tsdbRefMemTable(pTsdb->mem, pReader, reseek, &(*ppSnap)->pNode); - (*ppSnap)->pMem = pTsdb->mem; + pSnap->pMem = pTsdb->mem; + pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode)); + if (pSnap->pNode == NULL) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + pSnap->pNode->pQHandle = pReader; + pSnap->pNode->reseek = reseek; + + tsdbRefMemTable(pTsdb->mem, pSnap->pNode); } if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) { - tsdbRefMemTable(pTsdb->imem, pReader, reseek, &(*ppSnap)->pINode); - (*ppSnap)->pIMem = pTsdb->imem; + pSnap->pIMem = pTsdb->imem; + pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode)); + if (pSnap->pINode == NULL) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + pSnap->pINode->pQHandle = pReader; + pSnap->pINode->reseek = reseek; + + tsdbRefMemTable(pTsdb->imem, pSnap->pINode); } // fs - code = tsdbFSRef(pTsdb, &(*ppSnap)->fs); + code = tsdbFSRef(pTsdb, &pSnap->fs); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); goto _exit; } // unlock - code = taosThreadRwlockUnlock(&pTsdb->rwLock); - if (code) { - code = TAOS_SYSTEM_ERROR(code); - goto _exit; - } + taosThreadRwlockUnlock(&pTsdb->rwLock); tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode)); + _exit: + if (code) { + *ppSnap = NULL; + if (pSnap) { + if (pSnap->pNode) taosMemoryFree(pSnap->pNode); + if (pSnap->pINode) taosMemoryFree(pSnap->pINode); + taosMemoryFree(pSnap); + } + } else { + *ppSnap = pSnap; + } return code; } -void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap) { +void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proactive) { STsdb* pTsdb = pReader->pTsdb; if (pSnap) { if (pSnap->pMem) { - tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode); + tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode, proactive); } if (pSnap->pIMem) { - tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode); + tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive); } tsdbFSUnref(pTsdb, &pSnap->fs); + if (pSnap->pNode) taosMemoryFree(pSnap->pNode); + if (pSnap->pINode) taosMemoryFree(pSnap->pINode); taosMemoryFree(pSnap); } tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode)); diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 139ee23266..80a422ca3b 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -16,9 +16,7 @@ #include "vnd.h" /* ------------------------ STRUCTURES ------------------------ */ -#define VNODE_BUFPOOL_SEGMENTS 3 - -static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) { +static int vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPool **ppPool) { SVBufPool *pPool; pPool = taosMemoryMalloc(sizeof(SVBufPool) + size); @@ -26,6 +24,21 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + memset(pPool, 0, sizeof(SVBufPool)); + + // query handle list + taosThreadMutexInit(&pPool->mutex, NULL); + pPool->nQuery = 0; + pPool->qList.pNext = &pPool->qList; + pPool->qList.ppNext = &pPool->qList.pNext; + + pPool->pVnode = pVnode; + pPool->id = id; + pPool->ptr = pPool->node.data; + pPool->pTail = &pPool->node; + pPool->node.prev = NULL; + pPool->node.pnext = &pPool->pTail; + pPool->node.size = size; if (VND_IS_RSMA(pVnode)) { pPool->lock = taosMemoryMalloc(sizeof(TdThreadSpinlock)); @@ -44,16 +57,6 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) pPool->lock = NULL; } - pPool->next = NULL; - pPool->pVnode = pVnode; - pPool->nRef = 0; - pPool->size = 0; - pPool->ptr = pPool->node.data; - pPool->pTail = &pPool->node; - pPool->node.prev = NULL; - pPool->node.pnext = &pPool->pTail; - pPool->node.size = size; - *ppPool = pPool; return 0; } @@ -64,27 +67,25 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) { taosThreadSpinDestroy(pPool->lock); taosMemoryFree((void *)pPool->lock); } + taosThreadMutexDestroy(&pPool->mutex); taosMemoryFree(pPool); return 0; } int vnodeOpenBufPool(SVnode *pVnode) { - SVBufPool *pPool = NULL; - int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; - - ASSERT(pVnode->pPool == NULL); + int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; for (int i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) { // create pool - if (vnodeBufPoolCreate(pVnode, size, &pPool)) { + if (vnodeBufPoolCreate(pVnode, i, size, &pVnode->aBufPool[i])) { vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); vnodeCloseBufPool(pVnode); return -1; } - // add pool to vnode - pPool->next = pVnode->pPool; - pVnode->pPool = pPool; + // add to free list + pVnode->aBufPool[i]->freeNext = pVnode->freeList; + pVnode->freeList = pVnode->aBufPool[i]; } vDebug("vgId:%d, vnode buffer pool is opened, size:%" PRId64, TD_VID(pVnode), size); @@ -92,23 +93,19 @@ int vnodeOpenBufPool(SVnode *pVnode) { } int vnodeCloseBufPool(SVnode *pVnode) { - SVBufPool *pPool; - - for (pPool = pVnode->pPool; pPool; pPool = pVnode->pPool) { - pVnode->pPool = pPool->next; - vnodeBufPoolDestroy(pPool); + for (int32_t i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) { + if (pVnode->aBufPool[i]) { + vnodeBufPoolDestroy(pVnode->aBufPool[i]); + pVnode->aBufPool[i] = NULL; + } } - if (pVnode->inUse) { - vnodeBufPoolDestroy(pVnode->inUse); - pVnode->inUse = NULL; - } vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode)); - return 0; } void vnodeBufPoolReset(SVBufPool *pPool) { + ASSERT(pPool->nQuery == 0); for (SVBufPoolNode *pNode = pPool->pTail; pNode->prev; pNode = pPool->pTail) { ASSERT(pNode->pnext == &pPool->pTail); pNode->prev->pnext = &pPool->pTail; @@ -215,35 +212,121 @@ void vnodeBufPoolRef(SVBufPool *pPool) { ASSERT(nRef > 0); } -void vnodeBufPoolUnRef(SVBufPool *pPool) { - if (pPool == NULL) { - return; +void vnodeBufPoolAddToFreeList(SVBufPool *pPool) { + SVnode *pVnode = pPool->pVnode; + + int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; + if (pPool->node.size != size) { + SVBufPool *pNewPool = NULL; + if (vnodeBufPoolCreate(pVnode, pPool->id, size, &pNewPool) < 0) { + vWarn("vgId:%d failed to change buffer pool of id %d size from %" PRId64 " to %" PRId64 " since %s", + TD_VID(pVnode), pPool->id, pPool->node.size, size, tstrerror(errno)); + } else { + vInfo("vgId:%d buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id, + pPool->node.size, size); + + vnodeBufPoolDestroy(pPool); + pPool = pNewPool; + pVnode->aBufPool[pPool->id] = pPool; + } } - int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1); - if (nRef == 0) { - SVnode *pVnode = pPool->pVnode; - vnodeBufPoolReset(pPool); + // add to free list + vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id); + vnodeBufPoolReset(pPool); + pPool->freeNext = pVnode->freeList; + pVnode->freeList = pPool; + taosThreadCondSignal(&pVnode->poolNotEmpty); +} - taosThreadMutexLock(&pVnode->mutex); +void vnodeBufPoolUnRef(SVBufPool *pPool, bool proactive) { + if (pPool == NULL) return; - int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS; - if (pPool->node.size != size) { - SVBufPool *pPoolT = NULL; - if (vnodeBufPoolCreate(pVnode, size, &pPoolT) < 0) { - vWarn("vgId:%d, try to change buf pools size from %" PRId64 " to %" PRId64 " since %s", TD_VID(pVnode), - pPool->node.size, size, tstrerror(errno)); - } else { - vnodeBufPoolDestroy(pPool); - pPool = pPoolT; - vDebug("vgId:%d, change buf pools size from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->node.size, size); - } + SVnode *pVnode = pPool->pVnode; + + if (proactive) taosThreadMutexLock(&pVnode->mutex); + + if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit; + + // remove from recycle queue or on-recycle position + if (pVnode->onRecycle == pPool) { + pVnode->onRecycle = NULL; + } else { + ASSERT(proactive); + + if (pPool->recyclePrev) { + pPool->recyclePrev->recycleNext = pPool->recycleNext; + } else { + pVnode->recycleHead = pPool->recycleNext; } - pPool->next = pVnode->pPool; - pVnode->pPool = pPool; - taosThreadCondSignal(&pVnode->poolNotEmpty); - - taosThreadMutexUnlock(&pVnode->mutex); + if (pPool->recycleNext) { + pPool->recycleNext->recyclePrev = pPool->recyclePrev; + } else { + pVnode->recycleTail = pPool->recyclePrev; + } + pPool->recyclePrev = pPool->recycleNext = NULL; } + + vnodeBufPoolAddToFreeList(pPool); + +_exit: + if (proactive) taosThreadMutexUnlock(&pVnode->mutex); + return; } + +int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) { + int32_t code = 0; + + taosThreadMutexLock(&pPool->mutex); + + pQNode->pNext = pPool->qList.pNext; + pQNode->ppNext = &pPool->qList.pNext; + pPool->qList.pNext->ppNext = &pQNode->pNext; + pPool->qList.pNext = pQNode; + pPool->nQuery++; + + taosThreadMutexUnlock(&pPool->mutex); + +_exit: + return code; +} + +void vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode, bool proactive) { + int32_t code = 0; + + if (proactive) taosThreadMutexLock(&pPool->mutex); + + pQNode->pNext->ppNext = pQNode->ppNext; + *pQNode->ppNext = pQNode->pNext; + pPool->nQuery--; + + if (proactive) taosThreadMutexUnlock(&pPool->mutex); +} + +int32_t vnodeBufPoolRecycle(SVBufPool *pPool) { + int32_t code = 0; + + SVnode *pVnode = pPool->pVnode; + + vDebug("vgId:%d recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id); + + taosThreadMutexLock(&pPool->mutex); + + SQueryNode *pNode = pPool->qList.pNext; + while (pNode != &pPool->qList) { + SQueryNode *pTNode = pNode->pNext; + + int32_t rc = pNode->reseek(pNode->pQHandle); + if (rc == 0 || rc == TSDB_CODE_VND_QUERY_BUSY) { + pNode = pTNode; + } else { + code = rc; + goto _exit; + } + } + +_exit: + taosThreadMutexUnlock(&pPool->mutex); + return code; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index b42bdf564d..63c05ce88c 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -21,41 +21,128 @@ static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); static int vnodeCommitImpl(SCommitInfo *pInfo); -int vnodeBegin(SVnode *pVnode) { - // alloc buffer pool - taosThreadMutexLock(&pVnode->mutex); +#define WAIT_TIME_MILI_SEC 10 // miliseconds - while (pVnode->pPool == NULL) { - taosThreadCondWait(&pVnode->poolNotEmpty, &pVnode->mutex); +static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { + int32_t code = 0; + + if (pVnode->onRecycle == NULL) { + if (pVnode->recycleHead == NULL) { + vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode)); + goto _exit; + } else { + vDebug("vgId:%d buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead, + pVnode->recycleHead->id); + + pVnode->onRecycle = pVnode->recycleHead; + if (pVnode->recycleHead == pVnode->recycleTail) { + pVnode->recycleHead = pVnode->recycleTail = NULL; + } else { + pVnode->recycleHead = pVnode->recycleHead->recycleNext; + pVnode->recycleHead->recyclePrev = NULL; + } + pVnode->onRecycle->recycleNext = pVnode->onRecycle->recyclePrev = NULL; + } } - pVnode->inUse = pVnode->pPool; - pVnode->inUse->nRef = 1; - pVnode->pPool = pVnode->inUse->next; - pVnode->inUse->next = NULL; + code = vnodeBufPoolRecycle(pVnode->onRecycle); + if (code) goto _exit; +_exit: + if (code) { + vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code)); + } + return code; +} +static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { + int32_t code = 0; + int32_t lino = 0; + + taosThreadMutexLock(&pVnode->mutex); + + int32_t nTry = 0; + for (;;) { + ++nTry; + + if (pVnode->freeList) { + vDebug("vgId:%d allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList, + pVnode->freeList->id); + + pVnode->inUse = pVnode->freeList; + pVnode->inUse->nRef = 1; + pVnode->freeList = pVnode->inUse->freeNext; + pVnode->inUse->freeNext = NULL; + break; + } else { + vDebug("vgId:%d no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry); + /* + code = vnodeTryRecycleBufPool(pVnode); + TSDB_CHECK_CODE(code, lino, _exit); + */ + if (pVnode->freeList == NULL) { + vDebug("vgId:%d no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC); + + struct timeval tv; + struct timespec ts; + taosGetTimeOfDay(&tv); + ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000; + if (ts.tv_nsec > 999999999l) { + ts.tv_sec = tv.tv_sec + 1; + ts.tv_nsec -= 1000000000l; + } else { + ts.tv_sec = tv.tv_sec; + } + + int32_t rc = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts); + if (rc && rc != ETIMEDOUT) { + code = TAOS_SYSTEM_ERROR(rc); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + } + } + +_exit: taosThreadMutexUnlock(&pVnode->mutex); + if (code) { + vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } + return code; +} +int vnodeBegin(SVnode *pVnode) { + int32_t code = 0; + int32_t lino = 0; pVnode->state.commitID++; + + // alloc buffer pool + code = vnodeGetBufPoolToUse(pVnode); + TSDB_CHECK_CODE(code, lino, _exit); + // begin meta if (metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL) < 0) { - vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } // begin tsdb if (tsdbBegin(pVnode->pTsdb) < 0) { - vError("vgId:%d, failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } // begin sma if (VND_IS_RSMA(pVnode) && smaBegin(pVnode->pSma) < 0) { - vError("vgId:%d, failed to begin sma since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } - return 0; +_exit: + if (code) { + terrno = code; + vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } + return code; } void vnodeUpdCommitSched(SVnode *pVnode) { @@ -70,7 +157,7 @@ int vnodeShouldCommit(SVnode *pVnode) { } SVCommitSched *pSched = &pVnode->commitSched; - int64_t nowMs = taosGetMonoTimestampMs(); + int64_t nowMs = taosGetMonoTimestampMs(); return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) || (pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs)); @@ -209,6 +296,12 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { tsem_wait(&pVnode->canCommit); + taosThreadMutexLock(&pVnode->mutex); + ASSERT(pVnode->onCommit == NULL); + pVnode->onCommit = pVnode->inUse; + pVnode->inUse = NULL; + taosThreadMutexUnlock(&pVnode->mutex); + pVnode->state.commitTerm = pVnode->state.applyTerm; pInfo->info.config = pVnode->config; @@ -238,9 +331,6 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { code = smaPrepareAsyncCommit(pVnode->pSma); if (code) goto _exit; - vnodeBufPoolUnRef(pVnode->inUse); - pVnode->inUse = NULL; - _exit: if (code) { vError("vgId:%d, %s failed at line %d since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, lino, @@ -251,19 +341,48 @@ _exit: return code; } +static void vnodeReturnBufPool(SVnode *pVnode) { + taosThreadMutexLock(&pVnode->mutex); + SVBufPool *pPool = pVnode->onCommit; + int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1); + + pVnode->onCommit = NULL; + if (nRef == 0) { + vnodeBufPoolAddToFreeList(pPool); + } else if (nRef > 0) { + vDebug("vgId:%d buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id); + + if (pVnode->recycleTail == NULL) { + pPool->recyclePrev = pPool->recycleNext = NULL; + pVnode->recycleHead = pVnode->recycleTail = pPool; + } else { + pPool->recyclePrev = pVnode->recycleTail; + pPool->recycleNext = NULL; + pVnode->recycleTail->recycleNext = pPool; + pVnode->recycleTail = pPool; + } + } else { + ASSERT(0); + } + + taosThreadMutexUnlock(&pVnode->mutex); +} static int32_t vnodeCommitTask(void *arg) { int32_t code = 0; SCommitInfo *pInfo = (SCommitInfo *)arg; + SVnode *pVnode = pInfo->pVnode; // commit code = vnodeCommitImpl(pInfo); if (code) goto _exit; + vnodeReturnBufPool(pVnode); + _exit: // end commit - tsem_post(&pInfo->pVnode->canCommit); + tsem_post(&pVnode->canCommit); taosMemoryFree(pInfo); return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 8c07d0cec7..52c1f45762 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -240,7 +240,7 @@ _err: if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); if (pVnode->pSma) smaClose(pVnode->pSma); if (pVnode->pMeta) metaClose(pVnode->pMeta); - if (pVnode->pPool) vnodeCloseBufPool(pVnode); + if (pVnode->freeList) vnodeCloseBufPool(pVnode); tsem_destroy(&(pVnode->canCommit)); taosMemoryFree(pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index deb9da2050..6abc144b91 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -191,13 +191,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp void *pReq; int32_t len; int32_t ret; - + /* if (!pVnode->inUse) { terrno = TSDB_CODE_VND_NO_AVAIL_BUFPOOL; vError("vgId:%d, not ready to write since %s", TD_VID(pVnode), terrstr()); return -1; } - + */ if (version <= pVnode->state.applied) { vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version, pVnode->state.applied); @@ -1001,7 +1001,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq code = terrno; goto _exit; } - pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid; // update uid if table exist for using below + pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid; // update uid if table exist for using below } } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index a142bb76d7..bab3edc870 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -320,6 +320,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subsc TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_AVAIL_BUFPOOL, "No availabe buffer pool") TAOS_DEFINE_ERROR(TSDB_CODE_VND_STOPPED, "Vnode stopped") TAOS_DEFINE_ERROR(TSDB_CODE_VND_DUP_REQUEST, "Duplicate write request") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_QUERY_BUSY, "Query busy") // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")