diff --git a/include/util/taoserror.h b/include/util/taoserror.h index eb68f52a40..580a3dd7db 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -470,6 +470,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_CTG_DB_DROPPED TAOS_DEF_ERROR_CODE(0, 0x2405) #define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2406) #define TSDB_CODE_CTG_VG_META_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x2407) +#define TSDB_CODE_CTG_EXIT TAOS_DEF_ERROR_CODE(0, 0x2408) //scheduler&qworker #define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 98a03aa39b..31440608a5 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -381,6 +381,7 @@ typedef struct SCtgCacheOperation { void *data; bool syncOp; tsem_t rspSem; + bool lockQ; } SCtgCacheOperation; typedef struct SCtgQNode { @@ -390,6 +391,7 @@ typedef struct SCtgQNode { typedef struct SCtgQueue { SRWLatch qlock; + bool lockQ; SCtgQNode *head; SCtgQNode *tail; tsem_t reqSem; @@ -509,8 +511,20 @@ typedef struct SCtgOperation { #define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) -#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); CTG_RET(__code); } while (0) -#define CTG_API_ENTER() do { CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); CTG_LOCK(CTG_READ, &gCtgMgmt.lock); if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { CTG_API_LEAVE(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0) +#define CTG_API_LEAVE(c) do { \ + int32_t __code = c; \ + CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); \ + CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); \ + CTG_RET(__code); \ +} while (0) + +#define CTG_API_ENTER() do { \ + CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \ + CTG_LOCK(CTG_READ, &gCtgMgmt.lock); \ + if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { \ + CTG_API_LEAVE(TSDB_CODE_CTG_OUT_OF_SERVICE); \ + } \ +} while (0) void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p); void ctgdShowClusterCache(SCatalog* pCtg); @@ -543,7 +557,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq); int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet); int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncOp); -int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool syncOp); +int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool lockQ, bool syncOp); int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type); int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size); int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 44730cd3b5..fdada1cb0b 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1255,7 +1255,7 @@ int32_t catalogClearCache(void) { CTG_API_LEAVE(TSDB_CODE_SUCCESS); } - int32_t code = ctgClearCacheEnqueue(NULL, true); + int32_t code = ctgClearCacheEnqueue(NULL, false, true); qInfo("clear catalog cache end, code: %s", tstrerror(code)); @@ -1272,32 +1272,14 @@ void catalogDestroy(void) { atomic_store_8((int8_t*)&gCtgMgmt.exit, true); + ctgClearCacheEnqueue(NULL, true, true); + if (tsem_post(&gCtgMgmt.queue.reqSem)) { qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); } - while (CTG_IS_LOCKED(&gCtgMgmt.lock)) { - taosUsleep(1); - } - - CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock); - - SCatalog* pCtg = NULL; - void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); - while (pIter) { - pCtg = *(SCatalog**)pIter; - - if (pCtg) { - catalogFreeHandle(pCtg); - } - - pIter = taosHashIterate(gCtgMgmt.pCluster, pIter); - } - taosHashCleanup(gCtgMgmt.pCluster); gCtgMgmt.pCluster = NULL; - if (CTG_IS_LOCKED(&gCtgMgmt.lock) == TD_RWLATCH_WRITE_FLAG_COPY) CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock); - qInfo("catalog destroyed"); } diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index eeb627624b..9af227a9fd 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -659,6 +659,12 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { node->op = operation; CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); + if (gCtgMgmt.queue.lockQ) { + ctgFreeQNode(node); + CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); + CTG_RET(TSDB_CODE_CTG_EXIT); + } + gCtgMgmt.queue.lockQ = operation->lockQ; gCtgMgmt.queue.tail->next = node; gCtgMgmt.queue.tail = node; CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); @@ -996,11 +1002,12 @@ _return: } -int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool syncOp) { +int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool lockQ, bool syncOp) { int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_CLEAR_CACHE; op->syncOp = syncOp; + op->lockQ = lockQ; SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg)); if (NULL == msg) { @@ -1520,6 +1527,24 @@ _return: CTG_RET(code); } +void ctgClearAllCtgInstance(void) { + SCatalog* pCtg = NULL; + + void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); + while (pIter) { + pCtg = *(SCatalog**)pIter; + + if (pCtg) { + catalogFreeHandle(pCtg); + } + + pIter = taosHashIterate(gCtgMgmt.pCluster, pIter); + } + + taosHashClear(gCtgMgmt.pCluster); +} + + int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { int32_t code = 0; SCtgUpdateVgMsg *msg = operation->data; @@ -1942,18 +1967,7 @@ int32_t ctgOpClearCache(SCtgCacheOperation *operation) { goto _return; } - void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); - while (pIter) { - pCtg = *(SCatalog**)pIter; - - if (pCtg) { - catalogFreeHandle(pCtg); - } - - pIter = taosHashIterate(gCtgMgmt.pCluster, pIter); - } - - taosHashClear(gCtgMgmt.pCluster); + ctgClearAllCtgInstance(); _return: @@ -1962,11 +1976,6 @@ _return: CTG_RET(code); } - -void ctgUpdateThreadUnexpectedStopped(void) { - if (!atomic_load_8((int8_t*)&gCtgMgmt.exit) && CTG_IS_LOCKED(&gCtgMgmt.lock) > 0) CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); -} - void ctgCleanupCacheQueue(void) { SCtgQNode *node = NULL; SCtgQNode *nodeNext = NULL; @@ -2002,22 +2011,15 @@ void ctgCleanupCacheQueue(void) { void* ctgUpdateThreadFunc(void* param) { setThreadName("catalog"); -#ifdef WINDOWS - if (taosCheckCurrentInDll()) { - atexit(ctgUpdateThreadUnexpectedStopped); - } -#endif + qInfo("catalog update thread started"); - CTG_LOCK(CTG_READ, &gCtgMgmt.lock); - while (true) { if (tsem_wait(&gCtgMgmt.queue.reqSem)) { qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); } if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { - CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); ctgCleanupCacheQueue(); break; } diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 8635457dfe..9fd7f70f92 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -48,6 +48,10 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) { } } +void ctgFreeQNode(SCtgQNode *node) { + //TODO +} + void ctgFreeSTableIndex(void *info) { if (NULL == info) { return; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 734c6bfbfb..32ca43996e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1523,11 +1523,13 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { int64_t startTs = taosGetTimestampUs(); strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb)); +/* if (pInfo->showRewrite) { char dbName[TSDB_DB_NAME_LEN] = {0}; getDBNameFromCondition(pInfo->pCondition, dbName); sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName); } +*/ int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); char* buf1 = taosMemoryCalloc(1, contLen); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index b89e908df7..31c4fb80c9 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -457,6 +457,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error" TAOS_DEFINE_ERROR(TSDB_CODE_CTG_DB_DROPPED, "Database is dropped") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_OUT_OF_SERVICE, "catalog is out of service") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_VG_META_MISMATCH, "table meta and vgroup mismatch") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_EXIT, "catalog exit") //scheduler TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error")