fix: set function ptr.
This commit is contained in:
parent
1e22d875b3
commit
e24ce2111f
|
@ -196,8 +196,8 @@ typedef struct {
|
||||||
// int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
|
// int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
|
||||||
// bool tqNextBlockInWal(STqReader* pReader, const char* idstr);
|
// bool tqNextBlockInWal(STqReader* pReader, const char* idstr);
|
||||||
// bool tqNextBlockImpl(STqReader *pReader, const char* idstr);
|
// bool tqNextBlockImpl(STqReader *pReader, const char* idstr);
|
||||||
// int32_t getMetafromSnapShot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
|
// int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
|
||||||
// SMetaTableInfo getUidfromSnapShot(SSnapContext *ctx);
|
// SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx);
|
||||||
// int32_t setForSnapShot(SSnapContext *ctx, int64_t uid);
|
// int32_t setForSnapShot(SSnapContext *ctx, int64_t uid);
|
||||||
// int32_t destroySnapContext(SSnapContext *ctx);
|
// int32_t destroySnapContext(SSnapContext *ctx);
|
||||||
|
|
||||||
|
@ -310,15 +310,15 @@ typedef struct SStoreTqReader {
|
||||||
|
|
||||||
typedef struct SStoreSnapshotFn {
|
typedef struct SStoreSnapshotFn {
|
||||||
/*
|
/*
|
||||||
int32_t getMetafromSnapShot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
|
int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
|
||||||
SMetaTableInfo getUidfromSnapShot(SSnapContext *ctx);
|
SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx);
|
||||||
int32_t setForSnapShot(SSnapContext *ctx, int64_t uid);
|
int32_t setForSnapShot(SSnapContext *ctx, int64_t uid);
|
||||||
int32_t destroySnapContext(SSnapContext *ctx);
|
int32_t destroySnapContext(SSnapContext *ctx);
|
||||||
*/
|
*/
|
||||||
int32_t (*createSnapshot)();
|
int32_t (*createSnapshot)();
|
||||||
void (*destroySnapshot)();
|
int32_t (*destroySnapshot)();
|
||||||
SMetaTableInfo (*getTableInfoFromSnapshot)();
|
SMetaTableInfo (*getMetaTableInfoFromSnapshot)();
|
||||||
int32_t (*getMetaInfoFromSnapshot)();
|
int32_t (*getTableInfoFromSnapshot)();
|
||||||
} SStoreSnapshotFn;
|
} SStoreSnapshotFn;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -282,8 +282,8 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);
|
||||||
|
|
||||||
int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta,
|
int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta,
|
||||||
SSnapContext **ctxRet);
|
SSnapContext **ctxRet);
|
||||||
int32_t getMetafromSnapShot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
|
int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
|
||||||
SMetaTableInfo getUidfromSnapShot(SSnapContext *ctx);
|
SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx);
|
||||||
int32_t setForSnapShot(SSnapContext *ctx, int64_t uid);
|
int32_t setForSnapShot(SSnapContext *ctx, int64_t uid);
|
||||||
int32_t destroySnapContext(SSnapContext *ctx);
|
int32_t destroySnapContext(SSnapContext *ctx);
|
||||||
|
|
||||||
|
|
|
@ -466,7 +466,7 @@ int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
|
||||||
return c;
|
return c;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getMetafromSnapShot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) {
|
int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
void* pKey = NULL;
|
void* pKey = NULL;
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
|
@ -598,7 +598,7 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void** pBuf, int32_t* contLen, in
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx) {
|
SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext* ctx) {
|
||||||
SMetaTableInfo result = {0};
|
SMetaTableInfo result = {0};
|
||||||
void* pKey = NULL;
|
void* pKey = NULL;
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
|
@ -619,7 +619,7 @@ SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx) {
|
||||||
|
|
||||||
int32_t ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
|
int32_t ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
metaDebug("tmqsnap getUidfromSnapShot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp, idInfo->version);
|
metaDebug("tmqsnap getMetaTableInfoFromSnapshot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp, idInfo->version);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
|
tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
|
||||||
|
|
|
@ -304,6 +304,8 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
.version = handle.snapshotVer
|
.version = handle.snapshotVer
|
||||||
};
|
};
|
||||||
|
|
||||||
|
initStorageAPI(&reader.api);
|
||||||
|
|
||||||
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
handle.execHandle.task =
|
handle.execHandle.task =
|
||||||
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0);
|
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0);
|
||||||
|
|
|
@ -26,6 +26,7 @@ static void initMetaReaderAPI(SStoreMetaReader* pMetaReader);
|
||||||
static void initMetaFilterAPI(SMetaDataFilterAPI* pFilter);
|
static void initMetaFilterAPI(SMetaDataFilterAPI* pFilter);
|
||||||
static void initFunctionStateStore(SFunctionStateStore* pStore);
|
static void initFunctionStateStore(SFunctionStateStore* pStore);
|
||||||
static void initCacheFn(SStoreCacheReader* pCache);
|
static void initCacheFn(SStoreCacheReader* pCache);
|
||||||
|
static void initSnapshotFn(SStoreSnapshotFn* pSnapshot);
|
||||||
|
|
||||||
void initStorageAPI(SStorageAPI* pAPI) {
|
void initStorageAPI(SStorageAPI* pAPI) {
|
||||||
initTsdbReaderAPI(&pAPI->tsdReader);
|
initTsdbReaderAPI(&pAPI->tsdReader);
|
||||||
|
@ -36,6 +37,7 @@ void initStorageAPI(SStorageAPI* pAPI) {
|
||||||
initMetaFilterAPI(&pAPI->metaFilter);
|
initMetaFilterAPI(&pAPI->metaFilter);
|
||||||
initFunctionStateStore(&pAPI->functionStore);
|
initFunctionStateStore(&pAPI->functionStore);
|
||||||
initCacheFn(&pAPI->cacheFn);
|
initCacheFn(&pAPI->cacheFn);
|
||||||
|
initSnapshotFn(&pAPI->snapshotFn);
|
||||||
}
|
}
|
||||||
|
|
||||||
void initTsdbReaderAPI(TsdReader* pReader) {
|
void initTsdbReaderAPI(TsdReader* pReader) {
|
||||||
|
@ -69,7 +71,6 @@ void initMetadataAPI(SStoreMeta* pMeta) {
|
||||||
|
|
||||||
pMeta->getBasicInfo = vnodeGetInfo;
|
pMeta->getBasicInfo = vnodeGetInfo;
|
||||||
pMeta->getNumOfChildTables = metaGetStbStats;
|
pMeta->getNumOfChildTables = metaGetStbStats;
|
||||||
// pMeta->getNumOfRowsInMem = tsdbGetNumOfRowsInMemTable;
|
|
||||||
|
|
||||||
pMeta->getChildTableList = vnodeGetCtbIdList;
|
pMeta->getChildTableList = vnodeGetCtbIdList;
|
||||||
|
|
||||||
|
@ -224,4 +225,11 @@ void initCacheFn(SStoreCacheReader* pCache) {
|
||||||
pCache->closeReader = tsdbCacherowsReaderClose;
|
pCache->closeReader = tsdbCacherowsReaderClose;
|
||||||
pCache->retrieveRows = tsdbRetrieveCacheRows;
|
pCache->retrieveRows = tsdbRetrieveCacheRows;
|
||||||
pCache->reuseReader = tsdbReuseCacherowsReader;
|
pCache->reuseReader = tsdbReuseCacherowsReader;
|
||||||
|
}
|
||||||
|
|
||||||
|
void initSnapshotFn(SStoreSnapshotFn* pSnapshot) {
|
||||||
|
pSnapshot->createSnapshot = setForSnapShot;
|
||||||
|
pSnapshot->destroySnapshot = destroySnapContext;
|
||||||
|
pSnapshot->getMetaTableInfoFromSnapshot = getMetaTableInfoFromSnapshot;
|
||||||
|
pSnapshot->getTableInfoFromSnapshot = getTableInfoFromSnapshot;
|
||||||
}
|
}
|
|
@ -1190,7 +1190,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMetaTableInfo mtInfo = pTaskInfo->storageAPI.snapshotFn.getTableInfoFromSnapshot(sContext);
|
SMetaTableInfo mtInfo = pTaskInfo->storageAPI.snapshotFn.getMetaTableInfoFromSnapshot(sContext);
|
||||||
pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
|
pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
|
||||||
pInfo->dataReader = NULL;
|
pInfo->dataReader = NULL;
|
||||||
|
|
||||||
|
|
|
@ -380,17 +380,18 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
|
||||||
STableListInfo* pTableListInfo = tableListCreate();
|
STableListInfo* pTableListInfo = tableListCreate();
|
||||||
|
|
||||||
if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
|
if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
|
||||||
SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo));
|
SArray* pList = taosArrayInit(4, sizeof(uint64_t));
|
||||||
int32_t code = pTaskInfo->storageAPI.metaFn.getChildTableList(pHandle->vnode, pBlockNode->uid, pList);
|
int32_t code = pTaskInfo->storageAPI.metaFn.getChildTableList(pHandle->vnode, pBlockNode->uid, pList);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
|
taosArrayDestroy(pList);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t num = taosArrayGetSize(pList);
|
size_t num = taosArrayGetSize(pList);
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
STableKeyInfo* p = taosArrayGet(pList, i);
|
uint64_t* id = taosArrayGet(pList, i);
|
||||||
tableListAddTableInfo(pTableListInfo, p->uid, 0);
|
tableListAddTableInfo(pTableListInfo, *id, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pList);
|
taosArrayDestroy(pList);
|
||||||
|
|
|
@ -2185,7 +2185,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMetaTableInfo mtInfo = pAPI->snapshotFn.getTableInfoFromSnapshot(pInfo->sContext);
|
SMetaTableInfo mtInfo = pAPI->snapshotFn.getMetaTableInfoFromSnapshot(pInfo->sContext);
|
||||||
STqOffsetVal offset = {0};
|
STqOffsetVal offset = {0};
|
||||||
if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal
|
if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal
|
||||||
qDebug("tmqsnap read snapshot done, change to get data from wal");
|
qDebug("tmqsnap read snapshot done, change to get data from wal");
|
||||||
|
@ -2203,8 +2203,8 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
int16_t type = 0;
|
int16_t type = 0;
|
||||||
int64_t uid = 0;
|
int64_t uid = 0;
|
||||||
if (pAPI->snapshotFn.getMetaInfoFromSnapshot(sContext, &data, &dataLen, &type, &uid) < 0) {
|
if (pAPI->snapshotFn.getTableInfoFromSnapshot(sContext, &data, &dataLen, &type, &uid) < 0) {
|
||||||
qError("tmqsnap getMetafromSnapShot error");
|
qError("tmqsnap getTableInfoFromSnapshot error");
|
||||||
taosMemoryFreeClear(data);
|
taosMemoryFreeClear(data);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -2253,6 +2253,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
|
||||||
|
|
||||||
pInfo->pTableListInfo = tableListCreate();
|
pInfo->pTableListInfo = tableListCreate();
|
||||||
pInfo->vnode = pHandle->vnode;
|
pInfo->vnode = pHandle->vnode;
|
||||||
|
pInfo->pAPI = &pTaskInfo->storageAPI;
|
||||||
|
|
||||||
pInfo->sContext = pHandle->sContext;
|
pInfo->sContext = pHandle->sContext;
|
||||||
setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
|
||||||
|
@ -3470,7 +3471,7 @@ static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, S
|
||||||
|
|
||||||
int64_t ctbNum = 0;
|
int64_t ctbNum = 0;
|
||||||
int32_t code = pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, stbUid, &ctbNum);
|
int32_t code = pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, stbUid, &ctbNum);
|
||||||
fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
|
fillTableCountScanDataBlock(pSupp, dbName, varDataVal(stbName), ctbNum, pRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destoryTableCountScanOperator(void* param) {
|
static void destoryTableCountScanOperator(void* param) {
|
||||||
|
|
Loading…
Reference in New Issue