refactor: rename variables
This commit is contained in:
parent
8ff3f8e25e
commit
a2e7c78d27
|
@ -158,7 +158,7 @@ typedef enum {
|
||||||
|
|
||||||
typedef union {
|
typedef union {
|
||||||
struct {
|
struct {
|
||||||
int32_t fileSetId;
|
int32_t filesetId;
|
||||||
} duration;
|
} duration;
|
||||||
} STsdReaderNotifyInfo;
|
} STsdReaderNotifyInfo;
|
||||||
|
|
||||||
|
@ -183,7 +183,7 @@ typedef struct TsdReader {
|
||||||
int64_t (*tsdReaderGetNumOfInMemRows)();
|
int64_t (*tsdReaderGetNumOfInMemRows)();
|
||||||
void (*tsdReaderNotifyClosing)();
|
void (*tsdReaderNotifyClosing)();
|
||||||
|
|
||||||
void (*tsdSetDurationOrder)(void* pReader);
|
void (*tsdSetFilesetDelimited)(void* pReader);
|
||||||
void (*tsdSetSetNotifyCb)(void* pReader, TsdReaderNotifyCbFn notifyFn, void* param);
|
void (*tsdSetSetNotifyCb)(void* pReader, TsdReaderNotifyCbFn notifyFn, void* param);
|
||||||
} TsdReader;
|
} TsdReader;
|
||||||
|
|
||||||
|
|
|
@ -118,6 +118,7 @@ typedef struct SScanLogicNode {
|
||||||
bool igLastNull;
|
bool igLastNull;
|
||||||
bool groupOrderScan;
|
bool groupOrderScan;
|
||||||
bool onlyMetaCtbIdx; // for tag scan with no tbname
|
bool onlyMetaCtbIdx; // for tag scan with no tbname
|
||||||
|
bool filesetDelimited; // returned blocks delimited by fileset
|
||||||
} SScanLogicNode;
|
} SScanLogicNode;
|
||||||
|
|
||||||
typedef struct SJoinLogicNode {
|
typedef struct SJoinLogicNode {
|
||||||
|
|
|
@ -171,7 +171,7 @@ void *tsdbGetIvtIdx2(SMeta *pMeta);
|
||||||
uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader);
|
uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader);
|
||||||
void tsdbReaderSetCloseFlag(STsdbReader *pReader);
|
void tsdbReaderSetCloseFlag(STsdbReader *pReader);
|
||||||
int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr);
|
int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr);
|
||||||
void tsdbSetDurationOrder(STsdbReader* pReader);
|
void tsdbSetFilesetDelimited(STsdbReader* pReader);
|
||||||
void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param);
|
void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param);
|
||||||
|
|
||||||
int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables);
|
int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables);
|
||||||
|
|
|
@ -423,7 +423,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
pReader->bDurationOrder = false;
|
pReader->bFilesetDelimited = false;
|
||||||
|
|
||||||
tsdbInitReaderLock(pReader);
|
tsdbInitReaderLock(pReader);
|
||||||
tsem_init(&pReader->resumeAfterSuspend, 0, 0);
|
tsem_init(&pReader->resumeAfterSuspend, 0, 0);
|
||||||
|
@ -2520,7 +2520,7 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) {
|
||||||
if (!pReader->status.bProcMemPreFileset) {
|
if (!pReader->status.bProcMemPreFileset) {
|
||||||
if (pReader->notifyFn) {
|
if (pReader->notifyFn) {
|
||||||
STsdReaderNotifyInfo info = {0};
|
STsdReaderNotifyInfo info = {0};
|
||||||
info.duration.fileSetId = fid;
|
info.duration.filesetId = fid;
|
||||||
pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam);
|
pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2570,7 +2570,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlockNum->numOfBlocks + pBlockNum->numOfSttFiles > 0) {
|
if (pBlockNum->numOfBlocks + pBlockNum->numOfSttFiles > 0) {
|
||||||
if (pReader->bDurationOrder) {
|
if (pReader->bFilesetDelimited) {
|
||||||
prepareDurationForNextFileSet(pReader);
|
prepareDurationForNextFileSet(pReader);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -3915,7 +3915,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||||
|
|
||||||
if (pReader->bDurationOrder) {
|
if (pReader->bFilesetDelimited) {
|
||||||
getMemTableTimeRange(pReader, &pReader->status.memTableMaxKey, &pReader->status.memTableMinKey);
|
getMemTableTimeRange(pReader, &pReader->status.memTableMaxKey, &pReader->status.memTableMinKey);
|
||||||
pReader->status.bProcMemFirstFileset = true;
|
pReader->status.bProcMemFirstFileset = true;
|
||||||
}
|
}
|
||||||
|
@ -4227,7 +4227,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
||||||
|
|
||||||
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false);
|
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false);
|
||||||
pReader->pReadSnap = NULL;
|
pReader->pReadSnap = NULL;
|
||||||
if (pReader->bDurationOrder) {
|
if (pReader->bFilesetDelimited) {
|
||||||
pReader->status.memTableMinKey = INT64_MAX;
|
pReader->status.memTableMinKey = INT64_MAX;
|
||||||
pReader->status.memTableMaxKey = INT64_MIN;
|
pReader->status.memTableMaxKey = INT64_MIN;
|
||||||
}
|
}
|
||||||
|
@ -4342,7 +4342,7 @@ static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
|
||||||
return pBlock->info.rows > 0;
|
return pBlock->info.rows > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doTsdbNextDataBlockDurationOrder(STsdbReader* pReader) {
|
static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
||||||
|
@ -4361,7 +4361,7 @@ static int32_t doTsdbNextDataBlockDurationOrder(STsdbReader* pReader) {
|
||||||
pStatus->bProcMemPreFileset = false;
|
pStatus->bProcMemPreFileset = false;
|
||||||
if (pReader->notifyFn) {
|
if (pReader->notifyFn) {
|
||||||
STsdReaderNotifyInfo info = {0};
|
STsdReaderNotifyInfo info = {0};
|
||||||
info.duration.fileSetId = fid;
|
info.duration.filesetId = fid;
|
||||||
pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam);
|
pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam);
|
||||||
}
|
}
|
||||||
resetTableListIndex(pStatus);
|
resetTableListIndex(pStatus);
|
||||||
|
@ -4425,10 +4425,10 @@ static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
|
||||||
if (READ_MODE_COUNT_ONLY == pReader->info.readMode) {
|
if (READ_MODE_COUNT_ONLY == pReader->info.readMode) {
|
||||||
return tsdbReadRowsCountOnly(pReader);
|
return tsdbReadRowsCountOnly(pReader);
|
||||||
}
|
}
|
||||||
if (!pReader->bDurationOrder) {
|
if (!pReader->bFilesetDelimited) {
|
||||||
code = doTsdbNextDataBlockFilesFirst(pReader);
|
code = doTsdbNextDataBlockFilesFirst(pReader);
|
||||||
} else {
|
} else {
|
||||||
code = doTsdbNextDataBlockDurationOrder(pReader);
|
code = doTsdbNextDataBlockFilesetDelimited(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
*hasNext = pBlock->info.rows > 0;
|
*hasNext = pBlock->info.rows > 0;
|
||||||
|
@ -5152,8 +5152,8 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) {
|
||||||
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/
|
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbSetDurationOrder(STsdbReader* pReader) {
|
void tsdbSetFilesetDelimited(STsdbReader* pReader) {
|
||||||
pReader->bDurationOrder = true;
|
pReader->bFilesetDelimited = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param) {
|
void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param) {
|
||||||
|
|
|
@ -230,7 +230,7 @@ struct STsdbReader {
|
||||||
SBlockInfoBuf blockInfoBuf;
|
SBlockInfoBuf blockInfoBuf;
|
||||||
EContentData step;
|
EContentData step;
|
||||||
STsdbReader* innerReader[2];
|
STsdbReader* innerReader[2];
|
||||||
bool bDurationOrder; // duration by duration output
|
bool bFilesetDelimited; // duration by duration output
|
||||||
TsdReaderNotifyCbFn notifyFn;
|
TsdReaderNotifyCbFn notifyFn;
|
||||||
void* notifyParam;
|
void* notifyParam;
|
||||||
};
|
};
|
||||||
|
|
|
@ -61,7 +61,7 @@ void initTsdbReaderAPI(TsdReader* pReader) {
|
||||||
pReader->tsdSetQueryTableList = tsdbSetTableList2;
|
pReader->tsdSetQueryTableList = tsdbSetTableList2;
|
||||||
pReader->tsdSetReaderTaskId = (void (*)(void*, const char*))tsdbReaderSetId2;
|
pReader->tsdSetReaderTaskId = (void (*)(void*, const char*))tsdbReaderSetId2;
|
||||||
|
|
||||||
pReader->tsdSetDurationOrder = (void (*)(void*))tsdbSetDurationOrder;
|
pReader->tsdSetFilesetDelimited = (void (*)(void*))tsdbSetFilesetDelimited;
|
||||||
pReader->tsdSetSetNotifyCb = (void (*)(void*, TsdReaderNotifyCbFn, void*))tsdbReaderSetNotifyCb;
|
pReader->tsdSetSetNotifyCb = (void (*)(void*, TsdReaderNotifyCbFn, void*))tsdbReaderSetNotifyCb;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -297,7 +297,7 @@ typedef struct STableMergeScanInfo {
|
||||||
SHashObj* mSkipTables;
|
SHashObj* mSkipTables;
|
||||||
int64_t mergeLimit;
|
int64_t mergeLimit;
|
||||||
SSortExecInfo sortExecInfo;
|
SSortExecInfo sortExecInfo;
|
||||||
bool bNewDuration;
|
bool bNewFileset;
|
||||||
bool bOnlyRetrieveBlock;
|
bool bOnlyRetrieveBlock;
|
||||||
} STableMergeScanInfo;
|
} STableMergeScanInfo;
|
||||||
|
|
||||||
|
|
|
@ -3261,7 +3261,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!hasNext || isTaskKilled(pTaskInfo)) {
|
if (!hasNext || isTaskKilled(pTaskInfo)) {
|
||||||
pInfo->bNewDuration = false;
|
pInfo->bNewFileset = false;
|
||||||
if (isTaskKilled(pTaskInfo)) {
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo));
|
qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo));
|
||||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
||||||
|
@ -3269,7 +3269,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->bNewDuration) {
|
if (pInfo->bNewFileset) {
|
||||||
pInfo->bOnlyRetrieveBlock = true;
|
pInfo->bOnlyRetrieveBlock = true;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -3345,7 +3345,7 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond*
|
||||||
|
|
||||||
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
|
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
|
||||||
STableMergeScanInfo* pTmsInfo = param;
|
STableMergeScanInfo* pTmsInfo = param;
|
||||||
pTmsInfo->bNewDuration = true;
|
pTmsInfo->bNewFileset = true;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3355,7 +3355,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
|
int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
|
||||||
|
|
||||||
pInfo->bNewDuration = false;
|
pInfo->bNewFileset = false;
|
||||||
|
|
||||||
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
|
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
|
@ -3432,7 +3432,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
if (r == 1) {
|
if (r == 1) {
|
||||||
uInfo("zsl: DURATION ORDER");
|
uInfo("zsl: DURATION ORDER");
|
||||||
pAPI->tsdReader.tsdSetDurationOrder(pInfo->base.dataReader);
|
pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader);
|
||||||
} else {
|
} else {
|
||||||
uInfo("zsl: NO DURATION");
|
uInfo("zsl: NO DURATION");
|
||||||
}
|
}
|
||||||
|
@ -3541,7 +3541,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
return pBlock;
|
return pBlock;
|
||||||
} else {
|
} else {
|
||||||
if (pInfo->bNewDuration) {
|
if (pInfo->bNewFileset) {
|
||||||
stopDurationForGroupTableMergeScan(pOperator);
|
stopDurationForGroupTableMergeScan(pOperator);
|
||||||
startDurationForGroupTableMergeScan(pOperator);
|
startDurationForGroupTableMergeScan(pOperator);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue