Merge pull request #12032 from taosdata/feature/TD-13066-3.0
feat: migrate tsdb read interface to vnode
This commit is contained in:
commit
0bac6edfd6
|
@ -92,16 +92,16 @@ int metaTbCursorNext(SMTbCursor *pTbCur);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// tsdb
|
// tsdb
|
||||||
typedef struct STsdb STsdb;
|
// typedef struct STsdb STsdb;
|
||||||
typedef void *tsdbReaderT;
|
typedef void *tsdbReaderT;
|
||||||
|
|
||||||
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
|
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
|
||||||
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
||||||
#define BLOCK_LOAD_TABLE_RR_ORDER 3
|
#define BLOCK_LOAD_TABLE_RR_ORDER 3
|
||||||
|
|
||||||
tsdbReaderT *tsdbQueryTables(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
|
tsdbReaderT *tsdbQueryTables(SVnode *pVnode, SQueryTableDataCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
|
||||||
uint64_t taskId);
|
uint64_t taskId);
|
||||||
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId,
|
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId,
|
||||||
void *pMemRef);
|
void *pMemRef);
|
||||||
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
|
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||||
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
|
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
|
||||||
|
@ -116,7 +116,7 @@ SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumn
|
||||||
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond);
|
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond);
|
||||||
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
|
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
|
||||||
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
|
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
|
||||||
int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
|
int32_t tsdbGetTableGroupFromIdList(SVnode *pVnode, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
|
||||||
|
|
||||||
// tq
|
// tq
|
||||||
|
|
||||||
|
|
|
@ -87,14 +87,19 @@ int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg);
|
||||||
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
|
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
|
||||||
|
|
||||||
// tsdb
|
// tsdb
|
||||||
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb);
|
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb);
|
||||||
int tsdbClose(STsdb* pTsdb);
|
int tsdbClose(STsdb* pTsdb);
|
||||||
int tsdbBegin(STsdb* pTsdb);
|
int tsdbBegin(STsdb* pTsdb);
|
||||||
int tsdbCommit(STsdb* pTsdb);
|
int tsdbCommit(STsdb* pTsdb);
|
||||||
int32_t tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version);
|
int32_t tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version);
|
||||||
int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg);
|
int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg);
|
||||||
int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg);
|
int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg);
|
||||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
||||||
|
tsdbReaderT* tsdbQueryTablesT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
||||||
|
uint64_t taskId);
|
||||||
|
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
||||||
|
void* pMemRef);
|
||||||
|
int32_t tsdbGetTableGroupFromIdListT(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo);
|
||||||
|
|
||||||
// tq
|
// tq
|
||||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
|
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
|
||||||
|
|
|
@ -422,7 +422,7 @@ _end:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbReaderT* tsdbQueryTables(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
tsdbReaderT* tsdbQueryTablesT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
||||||
uint64_t taskId) {
|
uint64_t taskId) {
|
||||||
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId);
|
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId);
|
||||||
if (pTsdbReadHandle == NULL) {
|
if (pTsdbReadHandle == NULL) {
|
||||||
|
@ -535,7 +535,7 @@ tsdbReaderT tsdbQueryLastRow(STsdb* tsdb, SQueryTableDataCond* pCond, STableGrou
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(tsdb, pCond, groupList, qId, taskId);
|
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTablesT(tsdb, pCond, groupList, qId, taskId);
|
||||||
if (pTsdbReadHandle == NULL) {
|
if (pTsdbReadHandle == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -555,8 +555,8 @@ tsdbReaderT tsdbQueryLastRow(STsdb* tsdb, SQueryTableDataCond* pCond, STableGrou
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) {
|
tsdbReaderT tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) {
|
||||||
STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
|
STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTablesT(tsdb, pCond, groupList, qId, pMemRef);
|
||||||
if (pTsdbReadHandle == NULL) {
|
if (pTsdbReadHandle == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -634,7 +634,7 @@ tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb* tsdb, SQueryTableDataCond* pCon
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(tsdb, pCond, pNew, qId, taskId);
|
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTablesT(tsdb, pCond, pNew, qId, taskId);
|
||||||
pTsdbReadHandle->loadExternalRow = true;
|
pTsdbReadHandle->loadExternalRow = true;
|
||||||
pTsdbReadHandle->currentLoadExternalRows = true;
|
pTsdbReadHandle->currentLoadExternalRows = true;
|
||||||
|
|
||||||
|
@ -3717,7 +3717,7 @@ _error:
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
int32_t tsdbGetTableGroupFromIdList(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) {
|
int32_t tsdbGetTableGroupFromIdListT(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) {
|
||||||
if (tsdbRLockRepoMeta(tsdb) < 0) {
|
if (tsdbRLockRepoMeta(tsdb) < 0) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
|
@ -147,4 +147,24 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) {
|
||||||
if (vgId) {
|
if (vgId) {
|
||||||
*vgId = TD_VID(pVnode);
|
*vgId = TD_VID(pVnode);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// wrapper of tsdb read interface
|
||||||
|
// TODO: use FORCE_INLINE if possible later
|
||||||
|
tsdbReaderT *tsdbQueryTables(SVnode *pVnode, SQueryTableDataCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
|
||||||
|
uint64_t taskId) {
|
||||||
|
return tsdbQueryTablesT(pVnode->pTsdb, pCond, tableInfoGroup, qId, taskId);
|
||||||
|
}
|
||||||
|
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId,
|
||||||
|
void *pMemRef) {
|
||||||
|
#if 0
|
||||||
|
return tsdbQueryCacheLastT(pVnode->pTsdb, pCond, groupList, qId, pMemRef);
|
||||||
|
#endif
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
int32_t tsdbGetTableGroupFromIdList(SVnode *pVnode, SArray *pTableIdList, STableGroupInfo *pGroupInfo) {
|
||||||
|
#if 0
|
||||||
|
return tsdbGetTableGroupFromIdListT(pVnode->pTsdb, pTableIdList, pGroupInfo);
|
||||||
|
#endif
|
||||||
|
return 0;
|
||||||
}
|
}
|
|
@ -141,8 +141,10 @@ _err:
|
||||||
|
|
||||||
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
vTrace("message in vnode query queue is processing");
|
vTrace("message in vnode query queue is processing");
|
||||||
|
#if 0
|
||||||
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode};
|
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode};
|
||||||
|
#endif
|
||||||
|
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode};
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_VND_QUERY:
|
case TDMT_VND_QUERY:
|
||||||
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg);
|
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg);
|
||||||
|
|
|
@ -1908,7 +1908,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
||||||
if (!isUdaf) {
|
if (!isUdaf) {
|
||||||
fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
|
fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
|
||||||
} else {
|
} else {
|
||||||
char *udfName = pExpr->pExpr->_function.pFunctNode->functionName;
|
char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
|
||||||
strncpy(pCtx->udfName, udfName, strlen(udfName));
|
strncpy(pCtx->udfName, udfName, strlen(udfName));
|
||||||
fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
|
fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
|
||||||
}
|
}
|
||||||
|
@ -6925,8 +6925,10 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
#if 0
|
||||||
return tsdbQueryTables(pHandle->reader, &cond, pTableGroupInfo, queryId, taskId);
|
return tsdbQueryTables(pHandle->reader, &cond, pTableGroupInfo, queryId, taskId);
|
||||||
|
#endif
|
||||||
|
return tsdbQueryTables(pHandle->vnode, &cond, pTableGroupInfo, queryId, taskId);
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
terrno = code;
|
terrno = code;
|
||||||
|
|
Loading…
Reference in New Issue