fix: set correct function ptr.
This commit is contained in:
parent
0966213847
commit
c3be7b14be
|
@ -42,7 +42,6 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
void* tqReader;
|
||||
// void* meta;
|
||||
void* config;
|
||||
void* vnode;
|
||||
void* mnd;
|
||||
|
|
|
@ -84,7 +84,7 @@ typedef struct SMetaReader {
|
|||
SMetaEntry me;
|
||||
void * pBuf;
|
||||
int32_t szBuf;
|
||||
struct SStorageAPI *storageAPI;
|
||||
struct SStoreMeta* pAPI;
|
||||
} SMetaReader;
|
||||
|
||||
typedef struct SMTbCursor {
|
||||
|
@ -256,7 +256,7 @@ typedef struct SStoreCacheReader {
|
|||
void *(*closeReader)(void *pReader);
|
||||
int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
|
||||
SArray *pTableUidList);
|
||||
void (*reuseReader)(void *pReader, void *pTableIdList, int32_t numOfTables);
|
||||
int32_t (*reuseReader)(void *pReader, void *pTableIdList, int32_t numOfTables);
|
||||
} SStoreCacheReader;
|
||||
|
||||
/*------------------------------------------------------------------------------------------------------------------*/
|
||||
|
@ -290,6 +290,7 @@ typedef struct SStoreTqReader {
|
|||
int32_t (*tqRetrieveBlock)();
|
||||
bool (*tqReaderNextBlockInWal)();
|
||||
bool (*tqNextBlockImpl)(); // todo remove it
|
||||
SSDataBlock* (*tqGetResultBlock)();
|
||||
|
||||
void (*tqReaderSetColIdList)();
|
||||
int32_t (*tqReaderSetQueryTableList)();
|
||||
|
@ -345,14 +346,6 @@ int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey,
|
|||
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
|
||||
int32_t payloadLen);
|
||||
*/
|
||||
typedef struct SStoreMetaReader {
|
||||
void (*initReader)(SMetaReader *pReader, void *pMeta, int32_t flags);
|
||||
void (*clearReader)(SMetaReader *pReader);
|
||||
void (*readerReleaseLock)(SMetaReader *pReader);
|
||||
int32_t (*getTableEntryByUid)(SMetaReader *pReader, tb_uid_t uid);
|
||||
int32_t (*getTableEntryByName)(SMetaReader *pReader, const char *name);
|
||||
int32_t (*getEntryGetUidCache)(SMetaReader *pReader, tb_uid_t uid);
|
||||
} SStoreMetaReader;
|
||||
|
||||
typedef struct SStoreMeta {
|
||||
SMTbCursor *(*openTableMetaCursor)(void *pVnode); // metaOpenTbCursor
|
||||
|
@ -387,7 +380,7 @@ int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, in
|
|||
void *(*storeGetIndexInfo)();
|
||||
void *(*getInvertIndex)(void* pVnode);
|
||||
int32_t (*getChildTableList)(void *pVnode, int64_t suid, SArray *list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
|
||||
int32_t (*storeGetTableList)(); // vnodeGetStbIdList & vnodeGetAllTableList
|
||||
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList
|
||||
void *storeGetVersionRange;
|
||||
void *storeGetLastTimestamp;
|
||||
|
||||
|
@ -405,9 +398,14 @@ int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
|
|||
*/
|
||||
} SStoreMeta;
|
||||
|
||||
|
||||
|
||||
|
||||
typedef struct SStoreMetaReader {
|
||||
void (*initReader)(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta* pAPI);
|
||||
void (*clearReader)(SMetaReader *pReader);
|
||||
void (*readerReleaseLock)(SMetaReader *pReader);
|
||||
int32_t (*getTableEntryByUid)(SMetaReader *pReader, tb_uid_t uid);
|
||||
int32_t (*getTableEntryByName)(SMetaReader *pReader, const char *name);
|
||||
int32_t (*getEntryGetUidCache)(SMetaReader *pReader, tb_uid_t uid);
|
||||
} SStoreMetaReader;
|
||||
|
||||
typedef struct SUpdateInfo {
|
||||
SArray *pTsBuckets;
|
||||
|
|
|
@ -53,6 +53,8 @@ int32_t sndStopTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
|||
int32_t sndResumeTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
||||
#endif
|
||||
|
||||
void initStreamStateAPI(SStorageAPI* pAPI);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -62,8 +62,7 @@ FAIL:
|
|||
}
|
||||
|
||||
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
||||
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
|
||||
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
|
||||
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->childEpInfo) != 0);
|
||||
|
||||
pTask->refCnt = 1;
|
||||
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||
|
@ -88,6 +87,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
|||
|
||||
int32_t numOfChildEp = taosArrayGetSize(pTask->childEpInfo);
|
||||
SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState };
|
||||
initStreamStateAPI(&handle.api);
|
||||
|
||||
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0);
|
||||
ASSERT(pTask->exec.pExecutor);
|
||||
|
|
|
@ -69,6 +69,7 @@ int64_t vnodeGetSyncHandle(SVnode *pVnode);
|
|||
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot);
|
||||
void vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId, int64_t* numOfTables, int64_t* numOfNormalTables);
|
||||
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
|
||||
int32_t vnodeGetTableList(void* pVnode, int8_t type, SArray* pList);
|
||||
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
|
||||
int32_t vnodeIsCatchUp(SVnode *pVnode);
|
||||
ESyncRole vnodeGetRole(SVnode *pVnode);
|
||||
|
@ -105,7 +106,7 @@ typedef struct SMetaEntry SMetaEntry;
|
|||
|
||||
#define META_READER_NOLOCK 0x1
|
||||
|
||||
void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags);
|
||||
void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta* pAPI);
|
||||
void metaReaderReleaseLock(SMetaReader *pReader);
|
||||
void metaReaderClear(SMetaReader *pReader);
|
||||
int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
||||
|
@ -257,6 +258,7 @@ int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
|
|||
bool tqNextBlockInWal(STqReader *pReader, const char *idstr);
|
||||
bool tqNextBlockImpl(STqReader *pReader, const char *idstr);
|
||||
SWalReader* tqGetWalReader(STqReader* pReader);
|
||||
SSDataBlock* tqGetResultBlock (STqReader* pReader);
|
||||
|
||||
int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, const char *id);
|
||||
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
||||
|
|
|
@ -17,14 +17,14 @@
|
|||
#include "osMemory.h"
|
||||
#include "tencode.h"
|
||||
|
||||
void _metaReaderInit(SMetaReader* pReader, void* pVnode, int32_t flags) {
|
||||
void _metaReaderInit(SMetaReader* pReader, void* pVnode, int32_t flags, SStoreMeta* pAPI) {
|
||||
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
|
||||
metaReaderInit(pReader, pMeta, flags);
|
||||
pReader->pAPI = pAPI;
|
||||
}
|
||||
|
||||
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
|
||||
memset(pReader, 0, sizeof(*pReader));
|
||||
pReader->flags = flags;
|
||||
pReader->pMeta = pMeta;
|
||||
if (pReader->pMeta && !(flags & META_READER_NOLOCK)) {
|
||||
metaRLock(pMeta);
|
||||
|
|
|
@ -446,6 +446,9 @@ SWalReader* tqGetWalReader(STqReader* pReader) {
|
|||
return pReader->pWalReader;
|
||||
}
|
||||
|
||||
SSDataBlock* tqGetResultBlock (STqReader* pReader) {
|
||||
return pReader->pResBlock;
|
||||
}
|
||||
|
||||
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
|
||||
if (pReader->msg.msgStr == NULL) {
|
||||
|
|
|
@ -25,6 +25,7 @@ static void initStateStoreAPI(SStateStore* pStore);
|
|||
static void initMetaReaderAPI(SStoreMetaReader* pMetaReader);
|
||||
static void initMetaFilterAPI(SMetaDataFilterAPI* pFilter);
|
||||
static void initFunctionStateStore(SFunctionStateStore* pStore);
|
||||
static void initCacheFn(SStoreCacheReader* pCache);
|
||||
|
||||
void initStorageAPI(SStorageAPI* pAPI) {
|
||||
initTsdbReaderAPI(&pAPI->tsdReader);
|
||||
|
@ -34,6 +35,7 @@ void initStorageAPI(SStorageAPI* pAPI) {
|
|||
initMetaReaderAPI(&pAPI->metaReaderFn);
|
||||
initMetaFilterAPI(&pAPI->metaFilter);
|
||||
initFunctionStateStore(&pAPI->functionStore);
|
||||
initCacheFn(&pAPI->cacheFn);
|
||||
}
|
||||
|
||||
void initTsdbReaderAPI(TsdReader* pReader) {
|
||||
|
@ -83,6 +85,7 @@ void initMetadataAPI(SStoreMeta* pMeta) {
|
|||
pMeta->getTableNameByUid = metaGetTableNameByUid;
|
||||
|
||||
pMeta->getTableSchema = tsdbGetTableSchema; // todo refactor
|
||||
pMeta->storeGetTableList = vnodeGetTableList;
|
||||
}
|
||||
|
||||
void initTqAPI(SStoreTqReader* pTq) {
|
||||
|
@ -109,6 +112,8 @@ void initTqAPI(SStoreTqReader* pTq) {
|
|||
pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it
|
||||
|
||||
pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it
|
||||
pTq->tqGetResultBlock = tqGetResultBlock;
|
||||
|
||||
pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
|
||||
}
|
||||
|
||||
|
@ -214,3 +219,10 @@ void initFunctionStateStore(SFunctionStateStore* pStore) {
|
|||
pStore->streamStateFuncPut = streamStateFuncPut;
|
||||
pStore->streamStateFuncGet = streamStateFuncGet;
|
||||
}
|
||||
|
||||
void initCacheFn(SStoreCacheReader* pCache) {
|
||||
pCache->openReader = tsdbCacherowsReaderOpen;
|
||||
pCache->closeReader = tsdbCacherowsReaderClose;
|
||||
pCache->retrieveRows = tsdbRetrieveCacheRows;
|
||||
pCache->reuseReader = tsdbReuseCacherowsReader;
|
||||
}
|
|
@ -431,6 +431,14 @@ void vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId, int64_t* num
|
|||
}
|
||||
}
|
||||
|
||||
int32_t vnodeGetTableList(void* pVnode, int8_t type, SArray* pList) {
|
||||
if (type == TSDB_SUPER_TABLE) {
|
||||
return vnodeGetStbIdList(pVnode, 0, pList);
|
||||
} else {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) {
|
||||
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, uid, 1);
|
||||
|
||||
|
|
|
@ -263,7 +263,7 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
|
|||
|
||||
STagVal tagVal = {0};
|
||||
tagVal.cid = pSColumnNode->colId;
|
||||
const char* p = mr->storageAPI->metaFn.extractTagVal(mr->me.ctbEntry.pTags, pSColumnNode->node.resType.type, &tagVal);
|
||||
const char* p = mr->pAPI->extractTagVal(mr->me.ctbEntry.pTags, pSColumnNode->node.resType.type, &tagVal);
|
||||
if (p == NULL) {
|
||||
res->node.resType.type = TSDB_DATA_TYPE_NULL;
|
||||
} else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) {
|
||||
|
@ -306,7 +306,7 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle,
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SMetaReader mr = {0};
|
||||
|
||||
pAPI->metaReaderFn.initReader(&mr, metaHandle, 0);
|
||||
pAPI->metaReaderFn.initReader(&mr, metaHandle, 0, &pAPI->metaFn);
|
||||
code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, info->uid);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
pAPI->metaReaderFn.clearReader(&mr);
|
||||
|
@ -1168,7 +1168,7 @@ int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode,
|
|||
SStorageAPI* pAPI) {
|
||||
SMetaReader mr = {0};
|
||||
|
||||
pAPI->metaReaderFn.initReader(&mr, pVnode, 0);
|
||||
pAPI->metaReaderFn.initReader(&mr, pVnode, 0, &pAPI->metaFn);
|
||||
if (pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid) != 0) { // table not exist
|
||||
pAPI->metaReaderFn.clearReader(&mr);
|
||||
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||
|
|
|
@ -341,7 +341,7 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
|
|||
|
||||
// let's discard the tables those are not created according to the queried super table.
|
||||
SMetaReader mr = {0};
|
||||
pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, 0);
|
||||
pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, 0, &pAPI->metaFn);
|
||||
for (int32_t i = 0; i < numOfUids; ++i) {
|
||||
uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
|
||||
|
||||
|
@ -1091,12 +1091,13 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pScanBaseInfo->dataReader);
|
||||
pScanBaseInfo->dataReader = NULL;
|
||||
|
||||
ASSERT(0);
|
||||
// walReaderVerifyOffset(pInfo->tqReader->pWalReader, pOffset);
|
||||
// if (tqReaderSeek(pInfo->tqReader, pOffset->version + 1, id) < 0) {
|
||||
// qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
|
||||
// return -1;
|
||||
// }
|
||||
SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
|
||||
SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
|
||||
walReaderVerifyOffset(pWalReader, pOffset);
|
||||
if (pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version + 1, id) < 0) {
|
||||
qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
|
||||
return -1;
|
||||
}
|
||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
// iterate all tables from tableInfoList, and retrieve rows from each table one-by-one
|
||||
// those data are from the snapshot in tsdb, besides the data in the wal file.
|
||||
|
|
|
@ -381,9 +381,9 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
|
|||
|
||||
if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
|
||||
SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo));
|
||||
int32_t code = pTaskInfo->storageAPI.metaFn.storeGetTableList(pHandle->vnode, pBlockNode->uid, pList);
|
||||
int32_t code = pTaskInfo->storageAPI.metaFn.getChildTableList(pHandle->vnode, pBlockNode->uid, pList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = terrno;
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -122,7 +122,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
|
|||
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
|
||||
pAPI->metaReaderFn.initReader(&mr, pHandle->vnode, 0);
|
||||
pAPI->metaReaderFn.initReader(&mr, pHandle->vnode, 0, &pAPI->metaFn);
|
||||
int32_t code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, pScanNode->uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
|
||||
|
|
|
@ -531,7 +531,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
|
||||
// 1. check if it is existed in meta cache
|
||||
if (pCache == NULL) {
|
||||
pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0);
|
||||
pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0, &pHandle->api.metaFn);
|
||||
code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed.
|
||||
|
@ -560,7 +560,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
|
||||
h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
|
||||
if (h == NULL) {
|
||||
pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0);
|
||||
pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0, &pHandle->api.metaFn);
|
||||
code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||
|
@ -1330,9 +1330,9 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
|
|||
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||
int64_t version = pSrcBlock->info.version - 1;
|
||||
int64_t ver = pSrcBlock->info.version - 1;
|
||||
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
|
||||
uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
|
||||
uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver);
|
||||
// gap must be 0.
|
||||
SSessionKey startWin = {0};
|
||||
getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
|
||||
|
@ -1378,13 +1378,13 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
|
||||
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
|
||||
int64_t version = pSrcBlock->info.version - 1;
|
||||
int64_t ver = pSrcBlock->info.version - 1;
|
||||
|
||||
if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
|
||||
uint64_t srcUid = srcUidData[0];
|
||||
TSKEY startTs = srcStartTsCol[0];
|
||||
TSKEY endTs = srcEndTsCol[0];
|
||||
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, version);
|
||||
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, ver);
|
||||
printDataBlock(pPreRes, "pre res");
|
||||
blockDataCleanup(pSrcBlock);
|
||||
int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
|
||||
|
@ -1422,7 +1422,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
uint64_t srcUid = srcUidData[i];
|
||||
uint64_t groupId = srcGp[i];
|
||||
if (groupId == 0) {
|
||||
groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
|
||||
groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver);
|
||||
}
|
||||
TSKEY calStartTs = srcStartTsCol[i];
|
||||
colDataSetVal(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
|
||||
|
@ -1459,13 +1459,13 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
|
||||
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
|
||||
int64_t version = pSrcBlock->info.version - 1;
|
||||
int64_t ver = pSrcBlock->info.version - 1;
|
||||
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
|
||||
uint64_t srcUid = srcUidData[i];
|
||||
uint64_t groupId = srcGp[i];
|
||||
char* tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
|
||||
if (groupId == 0) {
|
||||
groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
|
||||
groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver);
|
||||
}
|
||||
if (pInfo->tbnameCalSup.pExprInfo) {
|
||||
void* parTbname = NULL;
|
||||
|
@ -1676,7 +1676,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
|||
while (1) {
|
||||
bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id);
|
||||
|
||||
SSDataBlock* pRes = NULL;
|
||||
SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader);
|
||||
struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader);
|
||||
|
||||
// curVersion move to next, so currentOffset = curVersion - 1
|
||||
|
@ -2558,7 +2558,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
|||
char str[512] = {0};
|
||||
int32_t count = 0;
|
||||
SMetaReader mr = {0};
|
||||
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0);
|
||||
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0, &pAPI->metaFn);
|
||||
|
||||
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
|
||||
doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
|
||||
|
@ -3185,6 +3185,7 @@ int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList*
|
|||
qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
|
||||
return code;
|
||||
}
|
||||
|
||||
supp->dbNameSlotId = -1;
|
||||
supp->stbNameSlotId = -1;
|
||||
supp->tbCountSlotId = -1;
|
||||
|
@ -3194,6 +3195,7 @@ int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList*
|
|||
qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
|
||||
return code;
|
||||
}
|
||||
|
||||
code = tblCountScanGetCountSlotId(pseudoCols, supp);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
|
||||
|
@ -3378,7 +3380,7 @@ static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountSca
|
|||
if (pSupp->groupByStbName) {
|
||||
if (pInfo->stbUidList == NULL) {
|
||||
pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
|
||||
if (pAPI->metaFn.storeGetTableList(pInfo->readHandle.vnode, 0, pInfo->stbUidList, TSDB_SUPER_TABLE) < 0) {
|
||||
if (pAPI->metaFn.storeGetTableList(pInfo->readHandle.vnode, TSDB_SUPER_TABLE, pInfo->stbUidList) < 0) {
|
||||
qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
|
||||
}
|
||||
}
|
||||
|
@ -3412,7 +3414,7 @@ static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanO
|
|||
|
||||
if (strlen(pSupp->dbNameFilter) != 0) {
|
||||
if (strlen(pSupp->stbNameFilter) != 0) {
|
||||
tb_uid_t uid = 0;
|
||||
uint64_t uid = 0;
|
||||
pAPI->metaFn.getTableUidByName(pInfo->readHandle.vnode, pSupp->stbNameFilter, &uid);
|
||||
|
||||
int64_t numOfChildTables = 0;
|
||||
|
@ -3420,7 +3422,8 @@ static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanO
|
|||
|
||||
fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, numOfChildTables, pRes);
|
||||
} else {
|
||||
int64_t tbNumVnode = 0;//metaGetTbNum(pInfo->readHandle.vnode);
|
||||
int64_t tbNumVnode = 0;
|
||||
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, &tbNumVnode, NULL);
|
||||
fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
|
||||
}
|
||||
} else {
|
||||
|
@ -3444,6 +3447,7 @@ static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, S
|
|||
|
||||
int64_t numOfTables = 0;//metaGetNtbNum(pInfo->readHandle.vnode);
|
||||
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, NULL, &numOfTables);
|
||||
|
||||
if (numOfTables != 0) {
|
||||
fillTableCountScanDataBlock(pSupp, dbName, "", numOfTables, pRes);
|
||||
}
|
||||
|
@ -3456,18 +3460,16 @@ static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, S
|
|||
|
||||
char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
if (pSupp->groupByDbName) {
|
||||
snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
|
||||
snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, varDataVal(stbName));
|
||||
} else {
|
||||
snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s", stbName);
|
||||
snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s", varDataVal(stbName));
|
||||
}
|
||||
|
||||
uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
|
||||
pRes->info.id.groupId = groupId;
|
||||
|
||||
SMetaStbStats stats = {0};
|
||||
// metaGetStbStats(pInfo->readHandle.vnode, stbUid, &stats);
|
||||
int64_t ctbNum = stats.ctbNum;
|
||||
|
||||
int64_t ctbNum = 0;
|
||||
int32_t code = pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, stbUid, &ctbNum);
|
||||
fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
|
||||
}
|
||||
|
||||
|
|
|
@ -466,7 +466,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
|||
STR_TO_VARSTR(tableName, pInfo->req.filterTb);
|
||||
|
||||
SMetaReader smrTable = {0};
|
||||
pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, 0);
|
||||
pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, 0, &pAPI->metaFn);
|
||||
int32_t code = pAPI->metaReaderFn.getTableEntryByName(&smrTable, pInfo->req.filterTb);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly
|
||||
|
@ -486,7 +486,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
|||
if (smrTable.me.type == TSDB_CHILD_TABLE) {
|
||||
int64_t suid = smrTable.me.ctbEntry.suid;
|
||||
pAPI->metaReaderFn.clearReader(&smrTable);
|
||||
pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, 0);
|
||||
pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, 0, &pAPI->metaFn);
|
||||
code = pAPI->metaReaderFn.getTableEntryByUid(&smrTable, suid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly
|
||||
|
@ -569,7 +569,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
|||
schemaRow = *(SSchemaWrapper**)schema;
|
||||
} else {
|
||||
SMetaReader smrSuperTable = {0};
|
||||
pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, 0);
|
||||
pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, 0, &pAPI->metaFn);
|
||||
int code = pAPI->metaReaderFn.getTableEntryByUid(&smrSuperTable, suid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly
|
||||
|
@ -658,7 +658,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
STR_TO_VARSTR(tableName, condTableName);
|
||||
|
||||
SMetaReader smrChildTable = {0};
|
||||
pAPI->metaReaderFn.initReader(&smrChildTable, pInfo->readHandle.vnode, 0);
|
||||
pAPI->metaReaderFn.initReader(&smrChildTable, pInfo->readHandle.vnode, 0, &pAPI->metaFn);
|
||||
int32_t code = pAPI->metaReaderFn.getTableEntryByName(&smrChildTable, condTableName);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly
|
||||
|
@ -676,7 +676,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
SMetaReader smrSuperTable = {0};
|
||||
pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, META_READER_NOLOCK);
|
||||
pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, META_READER_NOLOCK, &pAPI->metaFn);
|
||||
code = pAPI->metaReaderFn.getTableEntryByUid(&smrSuperTable, smrChildTable.me.ctbEntry.suid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// terrno has been set by pAPI->metaReaderFn.getTableEntryByUid
|
||||
|
@ -715,7 +715,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
|
||||
|
||||
SMetaReader smrSuperTable = {0};
|
||||
pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, 0);
|
||||
pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, 0, &pAPI->metaFn);
|
||||
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
|
||||
int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&smrSuperTable, suid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1131,7 +1131,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
|||
tb_uid_t* uid = taosArrayGet(pIdx->uids, i);
|
||||
|
||||
SMetaReader mr = {0};
|
||||
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0);
|
||||
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0, &pAPI->metaFn);
|
||||
ret = pAPI->metaReaderFn.getTableEntryByUid(&mr, *uid);
|
||||
if (ret < 0) {
|
||||
pAPI->metaReaderFn.clearReader(&mr);
|
||||
|
@ -1159,7 +1159,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
|||
colDataSetVal(pColInfoData, numOfRows, (char*)&ts, false);
|
||||
|
||||
SMetaReader mr1 = {0};
|
||||
pAPI->metaReaderFn.initReader(&mr1, pInfo->readHandle.vnode, META_READER_NOLOCK);
|
||||
pAPI->metaReaderFn.initReader(&mr1, pInfo->readHandle.vnode, META_READER_NOLOCK, &pAPI->metaFn);
|
||||
|
||||
int64_t suid = mr.me.ctbEntry.suid;
|
||||
int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr1, suid);
|
||||
|
@ -1338,7 +1338,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
|||
colDataSetVal(pColInfoData, numOfRows, (char*)&ts, false);
|
||||
|
||||
SMetaReader mr = {0};
|
||||
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_NOLOCK);
|
||||
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_NOLOCK, &pAPI->metaFn);
|
||||
|
||||
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
|
||||
int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr, suid);
|
||||
|
@ -2148,7 +2148,7 @@ static int32_t doGetTableRowSize(SReadHandle *pHandle, uint64_t uid, int32_t* ro
|
|||
*rowLen = 0;
|
||||
|
||||
SMetaReader mr = {0};
|
||||
pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0);
|
||||
pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0, &pHandle->api.metaFn);
|
||||
int32_t code = pHandle->api.metaReaderFn.getTableEntryByUid(&mr, uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", uid, tstrerror(terrno), idstr);
|
||||
|
|
Loading…
Reference in New Issue