refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-04-27 16:06:06 +08:00
parent 17a5691afb
commit 91f9b58f9a
6 changed files with 27 additions and 32 deletions

View File

@ -175,11 +175,8 @@ typedef struct STsdbReader STsdbReader;
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST_ROW 0x4
#define CACHESCAN_RETRIEVE_LAST 0x8 #define CACHESCAN_RETRIEVE_LAST 0x8
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num); int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly);
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly);
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr);
void tsdbReaderClose(STsdbReader *pReader); void tsdbReaderClose(STsdbReader *pReader);
int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext); int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
@ -190,7 +187,10 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader); uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
void tsdbReaderSetId(STsdbReader *pReader, const char *idstr);
void tsdbReaderSetCloseFlag(STsdbReader *pReader);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
uint64_t suid, void **pReader, const char *idstr); uint64_t suid, void **pReader, const char *idstr);
@ -231,26 +231,21 @@ typedef struct SSnapContext {
} SSnapContext; } SSnapContext;
typedef struct STqReader { typedef struct STqReader {
SPackedData msg2; SPackedData msg2;
SSubmitReq2 submit;
SSubmitReq2 submit; int32_t nextBlk;
int32_t nextBlk; int64_t lastBlkUid;
SWalReader *pWalReader;
int64_t lastBlkUid; SMeta *pVnodeMeta;
SHashObj *tbIdHash;
SWalReader *pWalReader; SArray *pColIdList; // SArray<int16_t>
SMeta *pVnodeMeta;
SHashObj *tbIdHash;
SArray *pColIdList; // SArray<int16_t>
int32_t cachedSchemaVer; int32_t cachedSchemaVer;
int64_t cachedSchemaSuid; int64_t cachedSchemaSuid;
SSchemaWrapper *pSchemaWrapper; SSchemaWrapper *pSchemaWrapper;
STSchema *pSchema; STSchema *pSchema;
} STqReader; } STqReader;
STqReader *tqOpenReader(SVnode *pVnode); STqReader *tqReaderOpen(SVnode *pVnode);
void tqCloseReader(STqReader *); void tqCloseReader(STqReader *);
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
@ -266,7 +261,7 @@ int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, i
bool tqNextBlockImpl(STqReader *pReader); bool tqNextBlockImpl(STqReader *pReader);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);

View File

@ -504,7 +504,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner); pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
pHandle->execHandle.pTqReader = tqOpenReader(pVnode); pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
pHandle->execHandle.execDb.pFilterOutTbUid = pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
@ -523,7 +523,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid); tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
} }
pHandle->execHandle.pTqReader = tqOpenReader(pVnode); pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList); tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);

View File

@ -328,7 +328,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
} }
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) { } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.execHandle.pTqReader = tqOpenReader(pTq->pVnode); handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
(SSnapContext**)(&reader.sContext)); (SSnapContext**)(&reader.sContext));
@ -343,7 +343,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid); tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
} }
handle.execHandle.pTqReader = tqOpenReader(pTq->pVnode); handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList); tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);

View File

@ -249,7 +249,7 @@ END:
return code; return code;
} }
STqReader* tqOpenReader(SVnode* pVnode) { STqReader* tqReaderOpen(SVnode* pVnode) {
STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader)); STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
if (pReader == NULL) { if (pReader == NULL) {
return NULL; return NULL;
@ -653,7 +653,7 @@ FAIL:
return -1; return -1;
} }
int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) { int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) {
tqDebug("tq reader retrieve data block %p, %d", pReader->msg2.msgStr, pReader->nextBlk); tqDebug("tq reader retrieve data block %p, %d", pReader->msg2.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);

View File

@ -207,7 +207,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
taosArrayClear(pBlocks); taosArrayClear(pBlocks);
taosArrayClear(pSchemas); taosArrayClear(pSchemas);
SSubmitTbData* pSubmitTbDataRet = NULL; SSubmitTbData* pSubmitTbDataRet = NULL;
if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
@ -266,7 +266,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
taosArrayClear(pBlocks); taosArrayClear(pBlocks);
taosArrayClear(pSchemas); taosArrayClear(pSchemas);
SSubmitTbData* pSubmitTbDataRet = NULL; SSubmitTbData* pSubmitTbDataRet = NULL;
if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {

View File

@ -1855,7 +1855,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB) { if (pSDB) {
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader); uint64_t version = tsdbGetReaderMaxVersion(pTableScanInfo->base.dataReader);
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false); checkUpdateData(pInfo, true, pSDB, false);
@ -2021,7 +2021,7 @@ FETCH_NEXT_BLOCK:
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB) { if (pSDB) {
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader); uint64_t version = tsdbGetReaderMaxVersion(pTableScanInfo->base.dataReader);
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false); checkUpdateData(pInfo, true, pSDB, false);
@ -2426,7 +2426,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (pHandle->initTqReader) { if (pHandle->initTqReader) {
ASSERT(pHandle->tqReader == NULL); ASSERT(pHandle->tqReader == NULL);
pInfo->tqReader = tqOpenReader(pHandle->vnode); pInfo->tqReader = tqReaderOpen(pHandle->vnode);
ASSERT(pInfo->tqReader); ASSERT(pInfo->tqReader);
} else { } else {
ASSERT(pHandle->tqReader); ASSERT(pHandle->tqReader);