fix:fix error.
This commit is contained in:
parent
01f6f74fb9
commit
a9b18d203d
|
@ -59,7 +59,7 @@ typedef struct SDataSinkMgtCfg {
|
|||
uint32_t maxDataBlockNumPerQuery;
|
||||
} SDataSinkMgtCfg;
|
||||
|
||||
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg);
|
||||
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI);
|
||||
|
||||
typedef struct SInputData {
|
||||
const struct SSDataBlock* pData;
|
||||
|
|
|
@ -118,7 +118,7 @@ const void *metaGetTableTagVal(const void *tag, int16_t type, STagVal *tagVal);
|
|||
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
|
||||
|
||||
int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName);
|
||||
int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid);
|
||||
int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid);
|
||||
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
|
||||
bool metaIsTableExist(void* pVnode, tb_uid_t uid);
|
||||
int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
|
||||
|
@ -192,7 +192,7 @@ int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList,
|
|||
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
|
||||
SArray *pTableUids);
|
||||
void *tsdbCacherowsReaderClose(void *pReader);
|
||||
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
|
||||
int32_t tsdbGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
|
||||
|
||||
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
|
||||
size_t tsdbCacheGetCapacity(SVnode *pVnode);
|
||||
|
|
|
@ -139,10 +139,10 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
|
|||
return uid;
|
||||
}
|
||||
|
||||
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName) {
|
||||
int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) {
|
||||
int code = 0;
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, (SMeta *)meta, 0);
|
||||
metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
|
||||
code = metaReaderGetTableEntryByUid(&mr, uid);
|
||||
if (code < 0) {
|
||||
metaReaderClear(&mr);
|
||||
|
@ -170,10 +170,10 @@ int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid) {
|
||||
int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) {
|
||||
int code = 0;
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, (SMeta *)meta, 0);
|
||||
metaReaderInit(&mr, ((SVnode *)pVnode)->pMeta, 0);
|
||||
|
||||
SMetaReader *pReader = &mr;
|
||||
|
||||
|
@ -191,10 +191,10 @@ int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType) {
|
||||
int metaGetTableTypeByName(void *pVnode, char *tbName, ETableType *tbType) {
|
||||
int code = 0;
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, (SMeta *)meta, 0);
|
||||
metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
|
||||
|
||||
code = metaGetTableEntryByName(&mr, tbName);
|
||||
if (code == 0) *tbType = mr.me.type;
|
||||
|
|
|
@ -5373,9 +5373,9 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
|
|||
return rows;
|
||||
}
|
||||
|
||||
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
|
||||
int32_t tsdbGetTableSchema(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pVnode->pMeta, 0);
|
||||
metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
|
||||
int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||
|
@ -5405,7 +5405,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
|
|||
metaReaderClear(&mr);
|
||||
|
||||
// get the newest table schema version
|
||||
code = metaGetTbTSchemaEx(pVnode->pMeta, *suid, uid, -1, pSchema);
|
||||
code = metaGetTbTSchemaEx(((SVnode*)pVnode)->pMeta, *suid, uid, -1, pSchema);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -78,6 +78,8 @@ void initMetadataAPI(SStoreMeta* pMeta) {
|
|||
pMeta->getTableUidByName = metaGetTableUidByName;
|
||||
pMeta->getTableTypeByName = metaGetTableTypeByName;
|
||||
pMeta->getTableNameByUid = metaGetTableNameByUid;
|
||||
|
||||
pMeta->getTableSchema = tsdbGetTableSchema; // todo refactor
|
||||
}
|
||||
|
||||
void initTqAPI(SStoreTqReader* pTq) {
|
||||
|
|
|
@ -30,7 +30,7 @@ struct SDataSinkHandle;
|
|||
|
||||
typedef struct SDataSinkManager {
|
||||
SDataSinkMgtCfg cfg;
|
||||
SStorageAPI storeFn;
|
||||
SStorageAPI* pAPI;
|
||||
} SDataSinkManager;
|
||||
|
||||
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
|
||||
|
|
|
@ -224,7 +224,7 @@ typedef struct STableScanInfo {
|
|||
int8_t assignBlockUid;
|
||||
bool hasGroupByTag;
|
||||
bool countOnly;
|
||||
TsdReader readerAPI;
|
||||
// TsdReader readerAPI;
|
||||
} STableScanInfo;
|
||||
|
||||
typedef struct STableMergeScanInfo {
|
||||
|
|
|
@ -429,7 +429,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
|
|||
inserter->explain = pInserterNode->explain;
|
||||
|
||||
int64_t suid = 0;
|
||||
int32_t code = pManager->storeFn.metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
|
||||
int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
|
||||
if (code) {
|
||||
destroyDataSinker((SDataSinkHandle*)inserter);
|
||||
taosMemoryFree(inserter);
|
||||
|
|
|
@ -21,8 +21,9 @@
|
|||
static SDataSinkManager gDataSinkManager = {0};
|
||||
SDataSinkStat gDataSinkStat = {0};
|
||||
|
||||
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg) {
|
||||
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI) {
|
||||
gDataSinkManager.cfg = *cfg;
|
||||
gDataSinkManager.pAPI = pAPI;
|
||||
return 0; // to avoid compiler eror
|
||||
}
|
||||
|
||||
|
|
|
@ -507,7 +507,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
|
|||
}
|
||||
|
||||
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
|
||||
code = dsDataSinkMgtInit(&cfg);
|
||||
code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
|
||||
goto _error;
|
||||
|
|
|
@ -929,7 +929,7 @@ static void destroyTableScanOperatorInfo(void* param) {
|
|||
STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
|
||||
blockDataDestroy(pTableScanInfo->pResBlock);
|
||||
taosHashCleanup(pTableScanInfo->pIgnoreTables);
|
||||
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->readerAPI);
|
||||
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
|
@ -975,7 +975,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
pInfo->sample.sampleRatio = pTableScanNode->ratio;
|
||||
pInfo->sample.seed = taosGetTimestampSec();
|
||||
|
||||
pInfo->readerAPI = pTaskInfo->storageAPI.tsdReader;
|
||||
pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
||||
// blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||
|
@ -1075,7 +1075,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint6
|
|||
pTableScanInfo->base.cond.endVersion = version;
|
||||
pTableScanInfo->scanTimes = 0;
|
||||
pTableScanInfo->currentGroupId = -1;
|
||||
pTableScanInfo->readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
||||
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
||||
pTableScanInfo->base.dataReader = NULL;
|
||||
}
|
||||
|
||||
|
@ -1250,7 +1250,7 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
|
|||
*pRowIndex = 0;
|
||||
pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||
pTableScanInfo->readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
||||
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
||||
pTableScanInfo->base.dataReader = NULL;
|
||||
return NULL;
|
||||
}
|
||||
|
@ -3045,6 +3045,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
|
||||
pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||
pInfo->base.scanFlag = MAIN_SCAN;
|
||||
pInfo->base.readHandle = *readHandle;
|
||||
|
|
Loading…
Reference in New Issue