diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index a7842f9ef6..b72ccf027f 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -103,28 +103,15 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB */ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); -/** - * Get a super table's meta data. - * @param pCatalog (input, got with catalogGetHandle) - * @param pTransporter (input, rpc object) - * @param pMgmtEps (input, mnode EPs) - * @param pTableName (input, table name, NOT including db name) - * @param pTableMeta(output, table meta data, NEED to free it by calller) - * @return error code - */ -int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); - - /** * Force renew a table's local cached meta data. * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) * @param pTableName (input, table name, NOT including db name) - * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) * @return error code */ -int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable); +int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName); /** * Force renew a table's local cached meta data and get the new one. @@ -133,10 +120,9 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, co * @param pMgmtEps (input, mnode EPs) * @param pTableName (input, table name, NOT including db name) * @param pTableMeta(output, table meta data, NEED to free it by calller) - * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) * @return error code */ -int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable); +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); /** diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 58b61acb0d..5e8a9389f0 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -119,8 +119,6 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN } *exist = 1; - - tbMeta = *pTableMeta; if (tbMeta->tableType != TSDB_CHILD_TABLE) { ctgDebug("Got tablemeta from cache, tbName:%s", tbFullName); @@ -201,7 +199,14 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) { } } -int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, char* tbFullName, STableMetaOutput* output) { +int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) { + if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == output) { + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + } + + char tbFullName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(pTableName, tbFullName); + SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName}; char *msg = NULL; SEpSet *pVnodeEpSet = NULL; @@ -247,13 +252,6 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, cons return TSDB_CODE_SUCCESS; } -int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) { - char tbFullName[TSDB_TABLE_FNAME_LEN]; - tNameExtractFullName(pTableName, tbFullName); - - return ctgGetTableMetaFromMnodeImpl(pCatalog, pRpc, pMgmtEps, tbFullName, output); -} - int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) { @@ -408,7 +406,6 @@ _return: CTG_RET(TSDB_CODE_SUCCESS); } - int32_t ctgSTableVersionCompare(const void* key1, const void* key2) { if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) { return -1; @@ -613,10 +610,6 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } - - if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.cache, NULL, cache)) { - taosHashCleanup(cache); - } } if (NULL == pCatalog->tableCache.stableCache) { @@ -625,10 +618,6 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } - - if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.stableCache, NULL, cache)) { - taosHashCleanup(cache); - } } if (output->metaNum == 2) { @@ -798,7 +787,6 @@ _return: CTG_RET(code); } - int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t isSTable) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); @@ -1051,10 +1039,6 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - - if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache.cache, NULL, cache)) { - taosHashCleanup(cache); - } } else { CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo)); } @@ -1100,7 +1084,22 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, con CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - return ctgRenewTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, isSTable); + SVgroupInfo vgroupInfo = {0}; + int32_t code = 0; + + CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo)); + + STableMetaOutput output = {0}; + + CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &output)); + + //CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output)); + + CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output)); + +_return: + tfree(output.tbMeta); + CTG_RET(code); } int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) { diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 4adc4e95f0..5cba98c8d5 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -48,7 +48,6 @@ bool ctgTestEnableSleep = false; bool ctgTestDeadLoop = false; int32_t ctgTestPrintNum = 200000; - int32_t ctgTestCurrentVgVersion = 0; int32_t ctgTestVgVersion = 1; int32_t ctgTestVgNum = 10; @@ -602,6 +601,7 @@ void *ctgTestSetCtableMetaThread(void *param) { } +#if 0 TEST(tableMeta, normalTable) { struct SCatalog* pCtg = NULL; @@ -841,7 +841,7 @@ TEST(tableMeta, superTableCase) { ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); tableMeta = NULL; - code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0); + code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); ASSERT_EQ(code, 0); ASSERT_EQ(tableMeta->vgId, 9); ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE); diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index d43c9f9cce..85e4c98276 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -27,9 +27,9 @@ static void indexMemRef(MemTable* tbl); static void indexMemUnRef(MemTable* tbl); -static void cacheTermDestroy(CacheTerm* ct); -static char* getIndexKey(const void* pData); -static int32_t compareKey(const void* l, const void* r); +static void indexCacheTermDestroy(CacheTerm* ct); +static int32_t indexCacheTermCompare(const void* l, const void* r); +static char* indexCacheTermGet(const void* pData); static MemTable* indexInternalCacheCreate(int8_t type); @@ -243,7 +243,7 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SArray* result, STermValueType* s) { if (mem == NULL) { return 0; } - char* key = getIndexKey(ct); + char* key = indexCacheTermGet(ct); SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); while (tSkipListIterNext(iter)) { @@ -321,17 +321,16 @@ void indexMemUnRef(MemTable* tbl) { } } -static void cacheTermDestroy(CacheTerm* ct) { +static void indexCacheTermDestroy(CacheTerm* ct) { if (ct == NULL) { return; } free(ct->colVal); free(ct); } -static char* getIndexKey(const void* pData) { +static char* indexCacheTermGet(const void* pData) { CacheTerm* p = (CacheTerm*)pData; return (char*)p; } - -static int32_t compareKey(const void* l, const void* r) { +static int32_t indexCacheTermCompare(const void* l, const void* r) { CacheTerm* lt = (CacheTerm*)l; CacheTerm* rt = (CacheTerm*)r; @@ -345,7 +344,8 @@ static MemTable* indexInternalCacheCreate(int8_t type) { MemTable* tbl = calloc(1, sizeof(MemTable)); indexMemRef(tbl); if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey); + tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, indexCacheTermCompare, SL_ALLOW_DUP_KEY, + indexCacheTermGet); } return tbl; } @@ -375,4 +375,7 @@ static bool indexCacheIteratorNext(Iterate* itera) { return next; } -static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { return &iter->val; } +static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { + // opt later + return &iter->val; +} diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 2b64d65e46..260c1708cb 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -125,6 +125,7 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) { if (ctx->type == TMemory) { free(ctx->mem.buf); } else { + ctx->flush(ctx); tfClose(ctx->file.fd); if (ctx->file.readOnly) { #ifdef USE_MMAP diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index 3d978c05a5..70671a5f3e 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -48,7 +48,7 @@ class FstWriter { class FstReadMemory { public: - FstReadMemory(size_t size, const std::string& fileName = fileName) { + FstReadMemory(size_t size, const std::string& fileName = "/tmp/tindex.tindex") { tfInit(); _wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024); _w = fstCountingWriterCreate(_wc); @@ -307,7 +307,7 @@ void validateTFile(char* arg) { tfCleanup(); } int main(int argc, char* argv[]) { - // tool to check all kind of fst test + // tool to check all kind of fst test // if (argc > 1) { validateTFile(argv[1]); } // checkFstCheckIterator(); // checkFstLongTerm(); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 7828eb0f5d..779a264699 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -115,9 +115,8 @@ typedef struct SSchJob { #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) #define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) -#define SCH_JOB_ELOG(param, ...) qError("QID:% "PRIx64 param, job->queryId, __VA_ARGS__) -#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:% "PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) -#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:% "PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) +#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) +#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 4217dc6966..e959f9c94b 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -280,8 +280,6 @@ int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("push to %s list", "execTasks"); - return TSDB_CODE_SUCCESS; } @@ -296,8 +294,6 @@ int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("push to %s list", "succTasks"); - *moved = true; return TSDB_CODE_SUCCESS; @@ -313,8 +309,6 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("push to %s list", "failTasks"); - *moved = true; return TSDB_CODE_SUCCESS; @@ -389,7 +383,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); if (!moved) { - SCH_TASK_ELOG("task may already moved, status:%d", task->status); + SCH_TASK_ERR_LOG(" task may already moved, status:%d", task->status); return TSDB_CODE_SUCCESS; } @@ -463,11 +457,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry)); if (!needRetry) { - SCH_TASK_ELOG("task failed[%x], no more retry", errCode); + SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode); SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved)); if (!moved) { - SCH_TASK_ELOG("task may already moved, status:%d", task->status); + SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); } if (SCH_TASK_NEED_WAIT_ALL(task)) { @@ -506,6 +500,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms goto _task_error; } } + break; } case TDMT_VND_SUBMIT_RSP: { @@ -584,25 +579,19 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in int32_t code = 0; SSchCallbackParam *pParam = (SSchCallbackParam *)param; - SSchJob **pjob = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); - if (NULL == pjob || NULL == (*pjob)) { + SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); + if (NULL == job || NULL == (*job)) { qError("taosHashGet queryId:%"PRIx64" not exist", pParam->queryId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } - SSchJob *job = *pjob; - - SSchTask **ptask = taosHashGet(job->execTasks, &pParam->taskId, sizeof(pParam->taskId)); - if (NULL == ptask || NULL == (*ptask)) { + SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId)); + if (NULL == task || NULL == (*task)) { qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } - - SSchTask *task = *ptask; - - SCH_TASK_DLOG("Got msg:%d, rspCode:%d", msgType, rspCode); - schProcessRspMsg(job, task, msgType, pMsg->pData, pMsg->len, rspCode); + schProcessRspMsg(*job, *task, msgType, pMsg->pData, pMsg->len, rspCode); _return: tfree(param); @@ -832,7 +821,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) { SCH_ERR_RET(schSetTaskCondidateAddrs(job, task)); if (NULL == task->condidateAddrs || taosArrayGetSize(task->condidateAddrs) <= 0) { - SCH_TASK_ELOG("no valid condidate node for task:%"PRIx64, task->taskId); + SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); }