fix: first running
This commit is contained in:
parent
4484e99e6c
commit
2f6a2923b7
|
@ -3318,6 +3318,7 @@ _error:
|
||||||
// TODO: get block from tsdReader function, with task killed, func_data all filter out, skip, finish
|
// TODO: get block from tsdReader function, with task killed, func_data all filter out, skip, finish
|
||||||
// TODO: error processing, memory freeing
|
// TODO: error processing, memory freeing
|
||||||
// TODO: add log for error and perf
|
// TODO: add log for error and perf
|
||||||
|
// TODO: tsdb reader open/close dynamically
|
||||||
|
|
||||||
static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) {
|
static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) {
|
||||||
int32_t left = *(int32_t*)pLeft;
|
int32_t left = *(int32_t*)pLeft;
|
||||||
|
@ -3350,11 +3351,14 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
|
||||||
|
|
||||||
blockDataCleanup(pInput->pBlock);
|
blockDataCleanup(pInput->pBlock);
|
||||||
|
|
||||||
|
pInfo->base.dataReader = pInput->pReader;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
bool hasNext = false;
|
bool hasNext = false;
|
||||||
int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInput->pReader, &hasNext);
|
int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInput->pReader, &hasNext);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInput->pReader);
|
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInput->pReader);
|
||||||
|
pInfo->base.dataReader = NULL;
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
if (!hasNext || isTaskKilled(pTaskInfo)) {
|
if (!hasNext || isTaskKilled(pTaskInfo)) {
|
||||||
|
@ -3367,6 +3371,7 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
|
||||||
uint32_t status = 0;
|
uint32_t status = 0;
|
||||||
code = loadDataBlock(pOperator, &pInfo->base, pInput->pBlock, &status);
|
code = loadDataBlock(pOperator, &pInfo->base, pInput->pBlock, &status);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
pInfo->base.dataReader = NULL;
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
|
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
|
||||||
|
@ -3379,8 +3384,10 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
|
||||||
*pSubTableHasBlock = true;
|
*pSubTableHasBlock = true;
|
||||||
pInput->pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pInput->pBlock->info.id.uid);
|
pInput->pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pInput->pBlock->info.id.uid);
|
||||||
pOperator->resultInfo.totalRows += pInput->pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pInput->pBlock->info.rows;
|
||||||
|
pInfo->base.dataReader = NULL;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
pInfo->base.dataReader = NULL;
|
||||||
*pSubTableHasBlock = false;
|
*pSubTableHasBlock = false;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -3488,7 +3495,7 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo*
|
||||||
pInput->type = SUB_TABLE_MEM_BLOCK;
|
pInput->type = SUB_TABLE_MEM_BLOCK;
|
||||||
pInput->pBlock = createOneDataBlock(pInfo->pResBlock, false);
|
pInput->pBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex);
|
STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex);
|
||||||
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, keyInfo, 1, pInfo->pReaderBlock,
|
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, keyInfo, 1, pInput->pBlock,
|
||||||
(void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL);
|
(void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL);
|
||||||
bool hasNext = true;
|
bool hasNext = true;
|
||||||
fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext);
|
fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext);
|
||||||
|
@ -3638,7 +3645,7 @@ static int32_t stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) {
|
||||||
if (pSubTblsInfo != NULL) {
|
if (pSubTblsInfo != NULL) {
|
||||||
tMergeTreeDestroy(&pSubTblsInfo->pTree);
|
tMergeTreeDestroy(&pSubTblsInfo->pTree);
|
||||||
|
|
||||||
for (int32_t i = 0; i <= pSubTblsInfo->numSubTables; ++i) {
|
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
|
||||||
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
|
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
|
||||||
|
|
||||||
blockDataDestroy(pInput->pBlock);
|
blockDataDestroy(pInput->pBlock);
|
||||||
|
@ -4272,7 +4279,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScanSubTables, NULL, destroyTableMergeScanOperatorInfo,
|
||||||
optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn, NULL);
|
||||||
pOperator->cost.openCost = 0;
|
pOperator->cost.openCost = 0;
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
Loading…
Reference in New Issue