adj operator res
This commit is contained in:
parent
14556de514
commit
1edcd654d4
|
@ -173,6 +173,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
|
|||
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull);
|
||||
|
||||
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
|
||||
QUERY_CHECK_NULL(pList, code, lino, _error, terrno);
|
||||
|
||||
uint64_t suid = tableListGetSuid(pTableListInfo);
|
||||
code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
|
||||
|
@ -413,8 +414,8 @@ void destroyCacheScanOperator(void* param) {
|
|||
SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param;
|
||||
blockDataDestroy(pInfo->pRes);
|
||||
blockDataDestroy(pInfo->pBufferedRes);
|
||||
taosMemoryFree(pInfo->pSlotIds);
|
||||
taosMemoryFree(pInfo->pDstSlotIds);
|
||||
taosMemoryFreeClear(pInfo->pSlotIds);
|
||||
taosMemoryFreeClear(pInfo->pDstSlotIds);
|
||||
taosArrayDestroy(pInfo->pCidList);
|
||||
taosArrayDestroy(pInfo->pFuncTypeList);
|
||||
taosArrayDestroy(pInfo->pUidList);
|
||||
|
@ -436,13 +437,13 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
|
|||
|
||||
*pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
|
||||
if (*pSlotIds == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
*pDstSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
|
||||
if (*pDstSlotIds == NULL) {
|
||||
taosMemoryFree(*pSlotIds);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFreeClear(*pSlotIds);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SSchemaInfo* pSchemaInfo = taosArrayGetLast(pTaskInfo->schemaInfos);
|
||||
|
|
|
@ -79,9 +79,15 @@ static int32_t setCountWindowOutputBuff(SExprSupp* pExprSup, SCountWindowSupp* p
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SCountWindowResult* pBuff = getCountWinStateInfo(pCountSup);
|
||||
QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno);
|
||||
(*pResult) = &pBuff->row;
|
||||
code = setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
|
||||
(*ppResBuff) = pBuff;
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -1274,6 +1274,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
if (uid == 0) {
|
||||
if (numOfTables != 0) {
|
||||
STableKeyInfo* tmp = tableListGetInfo(pTableListInfo, 0);
|
||||
if (!tmp) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
return terrno;
|
||||
}
|
||||
if (tmp) uid = tmp->uid;
|
||||
ts = INT64_MIN;
|
||||
pScanInfo->currentTable = 0;
|
||||
|
@ -1403,6 +1408,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
}
|
||||
|
||||
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
|
||||
if (!pList) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
tDeleteSchemaWrapper(mtInfo.schema);
|
||||
return code;
|
||||
}
|
||||
int32_t size = tableListGetSize(pTableListInfo);
|
||||
|
||||
code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size,
|
||||
|
@ -1487,6 +1497,7 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
|
|||
int32_t numOfTables = tableListGetSize(pTableListInfo);
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
|
||||
QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno);
|
||||
void* tmp = taosArrayPush(pUidList, &pKeyInfo->uid);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
}
|
||||
|
|
|
@ -1100,6 +1100,11 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo*
|
|||
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i);
|
||||
if (!pTable) {
|
||||
taosArrayDestroy(pDeleterParam->pUidList);
|
||||
taosMemoryFree(pDeleterParam);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
void* tmp = taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
|
||||
if (!tmp) {
|
||||
taosArrayDestroy(pDeleterParam->pUidList);
|
||||
|
|
|
@ -1257,8 +1257,14 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
(*ppRes) = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
|
||||
STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
|
||||
if (!tmp) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
(*ppRes) = NULL;
|
||||
return terrno;
|
||||
}
|
||||
tInfo = *tmp;
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
|
||||
code = pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1);
|
||||
|
@ -3507,6 +3513,7 @@ static int32_t extractTableIdList(const STableListInfo* pTableListInfo, SArray**
|
|||
size_t size = tableListGetSize(pTableListInfo);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
|
||||
QUERY_CHECK_NULL(pkeyInfo, code, lino, _end, terrno);
|
||||
void* tmp = taosArrayPush(tableIdList, &pkeyInfo->uid);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
}
|
||||
|
@ -4110,6 +4117,14 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes,
|
|||
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
||||
|
||||
STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
|
||||
if (!item) {
|
||||
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
|
||||
GET_TASKID(pTaskInfo));
|
||||
tDecoderClear(&(*mr).coder);
|
||||
pAPI->metaReaderFn.clearReader(mr);
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
||||
code = pAPI->metaReaderFn.getTableEntryByUid(mr, item->uid);
|
||||
tDecoderClear(&(*mr).coder);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -4787,18 +4802,23 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
|
||||
static int32_t setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
|
||||
pInfo->bGroupProcessed = false;
|
||||
|
||||
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
||||
int32_t i = pInfo->tableStartIndex + 1;
|
||||
for (; i < numOfTables; ++i) {
|
||||
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
|
||||
if (!tableKeyInfo) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
if (tableKeyInfo->groupId != pInfo->groupId) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
pInfo->tableEndIndex = i - 1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
|
||||
|
@ -4823,7 +4843,11 @@ static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
|
|||
}
|
||||
|
||||
static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
|
||||
setGroupStartEndIndex(pInfo);
|
||||
int32_t code = setGroupStartEndIndex(pInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
STmsSubTablesMergeInfo* pSubTblsInfo = taosMemoryCalloc(1, sizeof(STmsSubTablesMergeInfo));
|
||||
if (pSubTblsInfo == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -4842,7 +4866,7 @@ static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
|
|||
}
|
||||
int32_t bufPageSize = pInfo->bufPageSize;
|
||||
int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize;
|
||||
int32_t code =
|
||||
code =
|
||||
createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFree(pSubTblsInfo->aInputs);
|
||||
|
@ -4879,6 +4903,7 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo*
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex);
|
||||
QUERY_CHECK_NULL(keyInfo, code, lino, _end, terrno);
|
||||
pInput->pKeyInfo = keyInfo;
|
||||
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
|
@ -5114,7 +5139,9 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
return code;
|
||||
}
|
||||
pInfo->tableStartIndex = 0;
|
||||
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
|
||||
STableKeyInfo* pTmpGpId = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
|
||||
QUERY_CHECK_NULL(pTmpGpId, code, lino, _end, terrno);
|
||||
pInfo->groupId = pTmpGpId->groupId;
|
||||
code = startSubTablesTableMergeScan(pOperator);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
@ -5128,6 +5155,7 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
pBlock = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||
if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) {
|
||||
STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
|
||||
QUERY_CHECK_NULL(tbInfo, code, lino, _end, terrno);
|
||||
pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo);
|
||||
}
|
||||
if (pBlock != NULL) {
|
||||
|
@ -5144,7 +5172,10 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
}
|
||||
|
||||
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
|
||||
pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
|
||||
STableKeyInfo* pTmpGpId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
|
||||
QUERY_CHECK_NULL(pTmpGpId, code, lino, _end, terrno);
|
||||
|
||||
pInfo->groupId = pTmpGpId->groupId;
|
||||
code = startSubTablesTableMergeScan(pOperator);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||
|
@ -5473,6 +5504,7 @@ void startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
int32_t i = pInfo->tableStartIndex + 1;
|
||||
for (; i < numOfTables; ++i) {
|
||||
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
|
||||
QUERY_CHECK_NULL(tableKeyInfo, code, lino, _end, terrno);
|
||||
if (tableKeyInfo->groupId != pInfo->groupId) {
|
||||
break;
|
||||
}
|
||||
|
@ -5485,6 +5517,7 @@ void startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
|
||||
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
|
||||
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
|
||||
QUERY_CHECK_NULL(startKeyInfo, code, lino, _end, terrno);
|
||||
code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock,
|
||||
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), &pInfo->mSkipTables);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -5598,7 +5631,9 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
return code;
|
||||
}
|
||||
pInfo->tableStartIndex = 0;
|
||||
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
|
||||
STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
pInfo->groupId = tmp->groupId;
|
||||
startGroupTableMergeScan(pOperator);
|
||||
}
|
||||
|
||||
|
@ -5612,6 +5647,7 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
pOperator);
|
||||
if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) {
|
||||
STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
|
||||
QUERY_CHECK_NULL(tbInfo, code, lino, _end, terrno);
|
||||
pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo);
|
||||
}
|
||||
if (pBlock != NULL) {
|
||||
|
@ -5633,7 +5669,9 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
}
|
||||
|
||||
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
|
||||
pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
|
||||
STableKeyInfo* tmp = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
pInfo->groupId = tmp->groupId;
|
||||
startGroupTableMergeScan(pOperator);
|
||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||
}
|
||||
|
|
|
@ -2722,6 +2722,7 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP
|
|||
pInfo->pTableListInfo = pTableListInfo;
|
||||
size_t num = tableListGetSize(pTableListInfo);
|
||||
void* pList = tableListGetInfo(pTableListInfo, 0);
|
||||
QUERY_CHECK_NULL(pList, code, lino, _error, terrno);
|
||||
|
||||
code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock,
|
||||
(void**)&pInfo->pHandle, pTaskInfo->id.str, NULL);
|
||||
|
|
Loading…
Reference in New Issue