Merge remote-tracking branch 'origin/3.0' into feature/qnode
This commit is contained in:
commit
32b1e1bf3a
|
@ -103,28 +103,15 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
|
|||
*/
|
||||
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
|
||||
|
||||
/**
|
||||
* Get a super table's meta data.
|
||||
* @param pCatalog (input, got with catalogGetHandle)
|
||||
* @param pTransporter (input, rpc object)
|
||||
* @param pMgmtEps (input, mnode EPs)
|
||||
* @param pTableName (input, table name, NOT including db name)
|
||||
* @param pTableMeta(output, table meta data, NEED to free it by calller)
|
||||
* @return error code
|
||||
*/
|
||||
int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
|
||||
|
||||
|
||||
/**
|
||||
* Force renew a table's local cached meta data.
|
||||
* @param pCatalog (input, got with catalogGetHandle)
|
||||
* @param pTransporter (input, rpc object)
|
||||
* @param pMgmtEps (input, mnode EPs)
|
||||
* @param pTableName (input, table name, NOT including db name)
|
||||
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
|
||||
* @return error code
|
||||
*/
|
||||
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable);
|
||||
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName);
|
||||
|
||||
/**
|
||||
* Force renew a table's local cached meta data and get the new one.
|
||||
|
@ -133,10 +120,9 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, co
|
|||
* @param pMgmtEps (input, mnode EPs)
|
||||
* @param pTableName (input, table name, NOT including db name)
|
||||
* @param pTableMeta(output, table meta data, NEED to free it by calller)
|
||||
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
|
||||
* @return error code
|
||||
*/
|
||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable);
|
||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
|
||||
|
||||
|
||||
/**
|
||||
|
|
|
@ -119,8 +119,6 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
|
|||
}
|
||||
|
||||
*exist = 1;
|
||||
|
||||
tbMeta = *pTableMeta;
|
||||
|
||||
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
|
||||
ctgDebug("Got tablemeta from cache, tbName:%s", tbFullName);
|
||||
|
@ -201,7 +199,14 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, char* tbFullName, STableMetaOutput* output) {
|
||||
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
|
||||
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == output) {
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(pTableName, tbFullName);
|
||||
|
||||
SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName};
|
||||
char *msg = NULL;
|
||||
SEpSet *pVnodeEpSet = NULL;
|
||||
|
@ -247,13 +252,6 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, cons
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
|
||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(pTableName, tbFullName);
|
||||
|
||||
return ctgGetTableMetaFromMnodeImpl(pCatalog, pRpc, pMgmtEps, tbFullName, output);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
|
||||
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
|
||||
|
@ -408,7 +406,6 @@ _return:
|
|||
CTG_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgSTableVersionCompare(const void* key1, const void* key2) {
|
||||
if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) {
|
||||
return -1;
|
||||
|
@ -613,10 +610,6 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
|
|||
ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.cache, NULL, cache)) {
|
||||
taosHashCleanup(cache);
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL == pCatalog->tableCache.stableCache) {
|
||||
|
@ -625,10 +618,6 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
|
|||
ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.stableCache, NULL, cache)) {
|
||||
taosHashCleanup(cache);
|
||||
}
|
||||
}
|
||||
|
||||
if (output->metaNum == 2) {
|
||||
|
@ -798,7 +787,6 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t isSTable) {
|
||||
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
|
@ -1051,10 +1039,6 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
|
|||
ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache.cache, NULL, cache)) {
|
||||
taosHashCleanup(cache);
|
||||
}
|
||||
} else {
|
||||
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
|
||||
}
|
||||
|
@ -1100,7 +1084,22 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, con
|
|||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
return ctgRenewTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, isSTable);
|
||||
SVgroupInfo vgroupInfo = {0};
|
||||
int32_t code = 0;
|
||||
|
||||
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo));
|
||||
|
||||
STableMetaOutput output = {0};
|
||||
|
||||
CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &output));
|
||||
|
||||
//CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output));
|
||||
|
||||
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));
|
||||
|
||||
_return:
|
||||
tfree(output.tbMeta);
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
|
||||
|
|
|
@ -48,7 +48,6 @@ bool ctgTestEnableSleep = false;
|
|||
bool ctgTestDeadLoop = false;
|
||||
int32_t ctgTestPrintNum = 200000;
|
||||
|
||||
|
||||
int32_t ctgTestCurrentVgVersion = 0;
|
||||
int32_t ctgTestVgVersion = 1;
|
||||
int32_t ctgTestVgNum = 10;
|
||||
|
@ -602,6 +601,7 @@ void *ctgTestSetCtableMetaThread(void *param) {
|
|||
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
||||
TEST(tableMeta, normalTable) {
|
||||
struct SCatalog* pCtg = NULL;
|
||||
|
@ -841,7 +841,7 @@ TEST(tableMeta, superTableCase) {
|
|||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||
|
||||
tableMeta = NULL;
|
||||
code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0);
|
||||
code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(tableMeta->vgId, 9);
|
||||
ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE);
|
||||
|
|
|
@ -27,9 +27,9 @@
|
|||
static void indexMemRef(MemTable* tbl);
|
||||
static void indexMemUnRef(MemTable* tbl);
|
||||
|
||||
static void cacheTermDestroy(CacheTerm* ct);
|
||||
static char* getIndexKey(const void* pData);
|
||||
static int32_t compareKey(const void* l, const void* r);
|
||||
static void indexCacheTermDestroy(CacheTerm* ct);
|
||||
static int32_t indexCacheTermCompare(const void* l, const void* r);
|
||||
static char* indexCacheTermGet(const void* pData);
|
||||
|
||||
static MemTable* indexInternalCacheCreate(int8_t type);
|
||||
|
||||
|
@ -243,7 +243,7 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u
|
|||
|
||||
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SArray* result, STermValueType* s) {
|
||||
if (mem == NULL) { return 0; }
|
||||
char* key = getIndexKey(ct);
|
||||
char* key = indexCacheTermGet(ct);
|
||||
|
||||
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
|
||||
while (tSkipListIterNext(iter)) {
|
||||
|
@ -321,17 +321,16 @@ void indexMemUnRef(MemTable* tbl) {
|
|||
}
|
||||
}
|
||||
|
||||
static void cacheTermDestroy(CacheTerm* ct) {
|
||||
static void indexCacheTermDestroy(CacheTerm* ct) {
|
||||
if (ct == NULL) { return; }
|
||||
free(ct->colVal);
|
||||
free(ct);
|
||||
}
|
||||
static char* getIndexKey(const void* pData) {
|
||||
static char* indexCacheTermGet(const void* pData) {
|
||||
CacheTerm* p = (CacheTerm*)pData;
|
||||
return (char*)p;
|
||||
}
|
||||
|
||||
static int32_t compareKey(const void* l, const void* r) {
|
||||
static int32_t indexCacheTermCompare(const void* l, const void* r) {
|
||||
CacheTerm* lt = (CacheTerm*)l;
|
||||
CacheTerm* rt = (CacheTerm*)r;
|
||||
|
||||
|
@ -345,7 +344,8 @@ static MemTable* indexInternalCacheCreate(int8_t type) {
|
|||
MemTable* tbl = calloc(1, sizeof(MemTable));
|
||||
indexMemRef(tbl);
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
|
||||
tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, indexCacheTermCompare, SL_ALLOW_DUP_KEY,
|
||||
indexCacheTermGet);
|
||||
}
|
||||
return tbl;
|
||||
}
|
||||
|
@ -375,4 +375,7 @@ static bool indexCacheIteratorNext(Iterate* itera) {
|
|||
return next;
|
||||
}
|
||||
|
||||
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { return &iter->val; }
|
||||
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) {
|
||||
// opt later
|
||||
return &iter->val;
|
||||
}
|
||||
|
|
|
@ -125,6 +125,7 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
|
|||
if (ctx->type == TMemory) {
|
||||
free(ctx->mem.buf);
|
||||
} else {
|
||||
ctx->flush(ctx);
|
||||
tfClose(ctx->file.fd);
|
||||
if (ctx->file.readOnly) {
|
||||
#ifdef USE_MMAP
|
||||
|
|
|
@ -48,7 +48,7 @@ class FstWriter {
|
|||
|
||||
class FstReadMemory {
|
||||
public:
|
||||
FstReadMemory(size_t size, const std::string& fileName = fileName) {
|
||||
FstReadMemory(size_t size, const std::string& fileName = "/tmp/tindex.tindex") {
|
||||
tfInit();
|
||||
_wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024);
|
||||
_w = fstCountingWriterCreate(_wc);
|
||||
|
@ -307,7 +307,7 @@ void validateTFile(char* arg) {
|
|||
tfCleanup();
|
||||
}
|
||||
int main(int argc, char* argv[]) {
|
||||
// tool to check all kind of fst test
|
||||
// tool to check all kind of fst test
|
||||
// if (argc > 1) { validateTFile(argv[1]); }
|
||||
// checkFstCheckIterator();
|
||||
// checkFstLongTerm();
|
||||
|
|
|
@ -115,9 +115,8 @@ typedef struct SSchJob {
|
|||
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
|
||||
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
|
||||
|
||||
#define SCH_JOB_ELOG(param, ...) qError("QID:% "PRIx64 param, job->queryId, __VA_ARGS__)
|
||||
#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:% "PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
|
||||
#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:% "PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
|
||||
#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__)
|
||||
#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
|
||||
|
||||
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
|
||||
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
|
||||
|
|
|
@ -280,8 +280,6 @@ int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) {
|
|||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("push to %s list", "execTasks");
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -296,8 +294,6 @@ int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) {
|
|||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("push to %s list", "succTasks");
|
||||
|
||||
*moved = true;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -313,8 +309,6 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
|
|||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("push to %s list", "failTasks");
|
||||
|
||||
*moved = true;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -389,7 +383,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
|
|||
|
||||
SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved));
|
||||
if (!moved) {
|
||||
SCH_TASK_ELOG("task may already moved, status:%d", task->status);
|
||||
SCH_TASK_ERR_LOG(" task may already moved, status:%d", task->status);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -463,11 +457,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
|
|||
SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry));
|
||||
|
||||
if (!needRetry) {
|
||||
SCH_TASK_ELOG("task failed[%x], no more retry", errCode);
|
||||
SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode);
|
||||
|
||||
SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved));
|
||||
if (!moved) {
|
||||
SCH_TASK_ELOG("task may already moved, status:%d", task->status);
|
||||
SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
|
||||
}
|
||||
|
||||
if (SCH_TASK_NEED_WAIT_ALL(task)) {
|
||||
|
@ -506,6 +500,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
|
|||
goto _task_error;
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_SUBMIT_RSP: {
|
||||
|
@ -584,25 +579,19 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
|
|||
int32_t code = 0;
|
||||
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
|
||||
|
||||
SSchJob **pjob = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId));
|
||||
if (NULL == pjob || NULL == (*pjob)) {
|
||||
SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId));
|
||||
if (NULL == job || NULL == (*job)) {
|
||||
qError("taosHashGet queryId:%"PRIx64" not exist", pParam->queryId);
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
SSchJob *job = *pjob;
|
||||
|
||||
SSchTask **ptask = taosHashGet(job->execTasks, &pParam->taskId, sizeof(pParam->taskId));
|
||||
if (NULL == ptask || NULL == (*ptask)) {
|
||||
SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId));
|
||||
if (NULL == task || NULL == (*task)) {
|
||||
qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId);
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
SSchTask *task = *ptask;
|
||||
|
||||
SCH_TASK_DLOG("Got msg:%d, rspCode:%d", msgType, rspCode);
|
||||
|
||||
schProcessRspMsg(job, task, msgType, pMsg->pData, pMsg->len, rspCode);
|
||||
schProcessRspMsg(*job, *task, msgType, pMsg->pData, pMsg->len, rspCode);
|
||||
|
||||
_return:
|
||||
tfree(param);
|
||||
|
@ -832,7 +821,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
|
|||
SCH_ERR_RET(schSetTaskCondidateAddrs(job, task));
|
||||
|
||||
if (NULL == task->condidateAddrs || taosArrayGetSize(task->condidateAddrs) <= 0) {
|
||||
SCH_TASK_ELOG("no valid condidate node for task:%"PRIx64, task->taskId);
|
||||
SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue