feat: support fetching table tag value
This commit is contained in:
parent
4417c79cb1
commit
aa8d25f6e5
|
@ -82,6 +82,7 @@ typedef struct SCatalogReq {
|
|||
SArray* pUser; // element is SUserAuthInfo
|
||||
SArray* pTableIndex; // element is SNAME
|
||||
SArray* pTableCfg; // element is SNAME
|
||||
SArray* pTableTag; // element is SNAME
|
||||
bool qNodeRequired; // valid qnode
|
||||
bool dNodeRequired; // valid dnode
|
||||
bool svrVerRequired;
|
||||
|
|
|
@ -58,6 +58,7 @@ typedef enum {
|
|||
CTG_CI_OTHERTABLE_META,
|
||||
CTG_CI_TBL_SMA,
|
||||
CTG_CI_TBL_CFG,
|
||||
CTG_CI_TBL_TAG,
|
||||
CTG_CI_INDEX_INFO,
|
||||
CTG_CI_USER,
|
||||
CTG_CI_UDF,
|
||||
|
@ -110,6 +111,7 @@ typedef enum {
|
|||
CTG_TASK_GET_SVR_VER,
|
||||
CTG_TASK_GET_TB_META_BATCH,
|
||||
CTG_TASK_GET_TB_HASH_BATCH,
|
||||
CTG_TASK_GET_TB_TAG,
|
||||
} CTG_TASK_TYPE;
|
||||
|
||||
typedef enum {
|
||||
|
@ -186,6 +188,11 @@ typedef struct SCtgTbCfgCtx {
|
|||
SVgroupInfo* pVgInfo;
|
||||
} SCtgTbCfgCtx;
|
||||
|
||||
typedef struct SCtgTbTagCtx {
|
||||
SName* pName;
|
||||
SVgroupInfo* pVgInfo;
|
||||
} SCtgTbTagCtx;
|
||||
|
||||
typedef struct SCtgDbVgCtx {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
} SCtgDbVgCtx;
|
||||
|
@ -304,6 +311,7 @@ typedef struct SCtgJob {
|
|||
catalogCallback userFp;
|
||||
int32_t tbMetaNum;
|
||||
int32_t tbHashNum;
|
||||
int32_t tbTagNum;
|
||||
int32_t dbVgNum;
|
||||
int32_t udfNum;
|
||||
int32_t qnodeNum;
|
||||
|
|
|
@ -386,6 +386,37 @@ int32_t ctgInitGetTbCfgTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgInitGetTbTagTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
|
||||
SName* name = (SName*)param;
|
||||
SCtgTask task = {0};
|
||||
|
||||
task.type = CTG_TASK_GET_TB_TAG;
|
||||
task.taskId = taskIdx;
|
||||
task.pJob = pJob;
|
||||
|
||||
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbTagCtx));
|
||||
if (NULL == task.taskCtx) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCtgTbTagCtx* ctx = task.taskCtx;
|
||||
ctx->pName = taosMemoryMalloc(sizeof(*name));
|
||||
if (NULL == ctx->pName) {
|
||||
taosMemoryFree(task.taskCtx);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
memcpy(ctx->pName, name, sizeof(*name));
|
||||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx,
|
||||
ctgTaskTypeStr(task.type), name->tname);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, const SCatalogReq* pReq) {
|
||||
SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
SHashObj* pTb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
|
@ -437,6 +468,15 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, con
|
|||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(name, dbFName);
|
||||
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
||||
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->tbTagNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableTag, i);
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(name, dbFName);
|
||||
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
||||
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
|
||||
}
|
||||
|
||||
char* dbFName = taosHashIterate(pDb, NULL);
|
||||
|
@ -505,9 +545,10 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
|||
int32_t dbInfoNum = (int32_t)taosArrayGetSize(pReq->pDbInfo);
|
||||
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
|
||||
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
|
||||
int32_t tbTagNum = (int32_t)ctgGetTablesReqNum(pReq->pTableTag);
|
||||
|
||||
int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum +
|
||||
userNum + dbInfoNum + tbIndexNum + tbCfgNum;
|
||||
userNum + dbInfoNum + tbIndexNum + tbCfgNum + tbTagNum;
|
||||
|
||||
*job = taosMemoryCalloc(1, sizeof(SCtgJob));
|
||||
if (NULL == *job) {
|
||||
|
@ -537,6 +578,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
|||
pJob->tbIndexNum = tbIndexNum;
|
||||
pJob->tbCfgNum = tbCfgNum;
|
||||
pJob->svrVerNum = svrVerNum;
|
||||
pJob->tbTagNum = tbTagNum;
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
pJob->pBatchs =
|
||||
|
@ -604,6 +646,12 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
|||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_CFG, name, NULL));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tbCfgNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableTag, i);
|
||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_TAG, name, NULL));
|
||||
}
|
||||
|
||||
|
||||
for (int32_t i = 0; i < indexNum; ++i) {
|
||||
char* indexName = taosArrayGet(pReq->pIndex, i);
|
||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_INDEX_INFO, indexName, NULL));
|
||||
|
@ -1473,6 +1521,24 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgHandleGetTbTagRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
SCtgTask* pTask = tReq->pTask;
|
||||
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
|
||||
|
||||
STableCfgRsp* pRsp = (STableCfgRsp*)pTask->msgCtx.out;
|
||||
|
||||
TSWAP(pTask->res, pTask->msgCtx.out);
|
||||
|
||||
_return:
|
||||
|
||||
ctgHandleTaskEnd(pTask, code);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgHandleGetDbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
SCtgTask* pTask = tReq->pTask;
|
||||
|
@ -1935,6 +2001,45 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgLaunchGetTbTagTask(SCtgTask* pTask) {
|
||||
int32_t code = 0;
|
||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||
SCtgTbTagCtx* pCtx = (SCtgTbTagCtx*)pTask->taskCtx;
|
||||
SArray* pRes = NULL;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(pCtx->pName, dbFName);
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||
if (NULL == pMsgCtx->pBatchs) {
|
||||
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||
}
|
||||
|
||||
if (NULL == pCtx->pVgInfo) {
|
||||
CTG_ERR_JRET(ctgGetTbHashVgroupFromCache(pCtg, pCtx->pName, &pCtx->pVgInfo));
|
||||
if (NULL == pCtx->pVgInfo) {
|
||||
CTG_ERR_JRET(ctgLaunchSubTask(pTask, CTG_TASK_GET_DB_VGROUP, ctgGetTbCfgCb, dbFName));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
CTG_CACHE_NHIT_INC(CTG_CI_TBL_TAG, 1);
|
||||
|
||||
CTG_ERR_JRET(ctgGetTableCfgFromVnode(pCtg, pConn, pCtx->pName, pCtx->pVgInfo, NULL, pTask));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
if (CTG_TASK_LAUNCHED == pTask->status) {
|
||||
ctgHandleTaskEnd(pTask, code);
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgLaunchGetQnodeTask(SCtgTask* pTask) {
|
||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||
|
@ -2138,6 +2243,25 @@ _return:
|
|||
CTG_RET(ctgHandleTaskEnd(pTask, pTask->subRes.code));
|
||||
}
|
||||
|
||||
int32_t ctgGetTbTagCb(SCtgTask* pTask) {
|
||||
int32_t code = 0;
|
||||
|
||||
CTG_ERR_JRET(pTask->subRes.code);
|
||||
|
||||
SCtgTbTagCtx* pCtx = (SCtgTbTagCtx*)pTask->taskCtx;
|
||||
SDBVgInfo* pDb = (SDBVgInfo*)pTask->subRes.res;
|
||||
|
||||
pCtx->pVgInfo = taosMemoryCalloc(1, sizeof(SVgroupInfo));
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pTask->pJob->pCtg, pDb, pCtx->pName, pCtx->pVgInfo));
|
||||
|
||||
CTG_RET(ctgLaunchGetTbTagTask(pTask));
|
||||
|
||||
_return:
|
||||
|
||||
CTG_RET(ctgHandleTaskEnd(pTask, pTask->subRes.code));
|
||||
}
|
||||
|
||||
|
||||
|
||||
int32_t ctgGetUserCb(SCtgTask* pTask) {
|
||||
int32_t code = 0;
|
||||
|
@ -2197,6 +2321,7 @@ SCtgAsyncFps gCtgAsyncFps[] = {
|
|||
{ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL},
|
||||
{ctgInitGetTbMetasTask, ctgLaunchGetTbMetasTask, ctgHandleGetTbMetasRsp, ctgDumpTbMetasRes, NULL, NULL},
|
||||
{ctgInitGetTbHashsTask, ctgLaunchGetTbHashsTask, ctgHandleGetTbHashsRsp, ctgDumpTbHashsRes, NULL, NULL},
|
||||
{ctgInitGetTbTagTask, ctgLaunchGetTbTagTask, ctgHandleGetTbTagRsp, ctgDumpTbTagRes, NULL, NULL},
|
||||
};
|
||||
|
||||
int32_t ctgMakeAsyncRes(SCtgJob* pJob) {
|
||||
|
|
Loading…
Reference in New Issue