commit
6fd580e476
|
@ -187,6 +187,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
|
|||
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull);
|
||||
|
||||
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
|
||||
if (totalTables) QUERY_CHECK_NULL(pList, code, lino, _error, terrno);
|
||||
|
||||
uint64_t suid = tableListGetSuid(pTableListInfo);
|
||||
code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
|
||||
|
@ -439,8 +440,8 @@ void destroyCacheScanOperator(void* param) {
|
|||
SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param;
|
||||
blockDataDestroy(pInfo->pRes);
|
||||
blockDataDestroy(pInfo->pBufferedRes);
|
||||
taosMemoryFree(pInfo->pSlotIds);
|
||||
taosMemoryFree(pInfo->pDstSlotIds);
|
||||
taosMemoryFreeClear(pInfo->pSlotIds);
|
||||
taosMemoryFreeClear(pInfo->pDstSlotIds);
|
||||
taosArrayDestroy(pInfo->pCidList);
|
||||
taosArrayDestroy(pInfo->pFuncTypeList);
|
||||
taosArrayDestroy(pInfo->pUidList);
|
||||
|
@ -462,13 +463,13 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
|
|||
|
||||
*pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
|
||||
if (*pSlotIds == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
*pDstSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
|
||||
if (*pDstSlotIds == NULL) {
|
||||
taosMemoryFree(*pSlotIds);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFreeClear(*pSlotIds);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SSchemaInfo* pSchemaInfo = taosArrayGetLast(pTaskInfo->schemaInfos);
|
||||
|
|
|
@ -82,9 +82,15 @@ static int32_t setCountWindowOutputBuff(SExprSupp* pExprSup, SCountWindowSupp* p
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SCountWindowResult* pBuff = getCountWinStateInfo(pCountSup);
|
||||
QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno);
|
||||
(*pResult) = &pBuff->row;
|
||||
code = setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
|
||||
(*ppResBuff) = pBuff;
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -1293,6 +1293,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
if (uid == 0) {
|
||||
if (numOfTables != 0) {
|
||||
STableKeyInfo* tmp = tableListGetInfo(pTableListInfo, 0);
|
||||
if (!tmp) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
return terrno;
|
||||
}
|
||||
if (tmp) uid = tmp->uid;
|
||||
ts = INT64_MIN;
|
||||
pScanInfo->currentTable = 0;
|
||||
|
@ -1422,6 +1427,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
}
|
||||
|
||||
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
|
||||
if (!pList) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
tDeleteSchemaWrapper(mtInfo.schema);
|
||||
return code;
|
||||
}
|
||||
int32_t size = tableListGetSize(pTableListInfo);
|
||||
|
||||
code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size,
|
||||
|
@ -1506,6 +1516,7 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
|
|||
int32_t numOfTables = tableListGetSize(pTableListInfo);
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
|
||||
QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno);
|
||||
void* tmp = taosArrayPush(pUidList, &pKeyInfo->uid);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
}
|
||||
|
|
|
@ -1107,6 +1107,11 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo*
|
|||
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i);
|
||||
if (!pTable) {
|
||||
taosArrayDestroy(pDeleterParam->pUidList);
|
||||
taosMemoryFree(pDeleterParam);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
void* tmp = taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
|
||||
if (!tmp) {
|
||||
taosArrayDestroy(pDeleterParam->pUidList);
|
||||
|
|
|
@ -1261,8 +1261,14 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
(*ppRes) = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
|
||||
STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
|
||||
if (!tmp) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
(*ppRes) = NULL;
|
||||
return terrno;
|
||||
}
|
||||
tInfo = *tmp;
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
|
||||
code = pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1);
|
||||
|
@ -3517,6 +3523,7 @@ static int32_t extractTableIdList(const STableListInfo* pTableListInfo, SArray**
|
|||
size_t size = tableListGetSize(pTableListInfo);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
|
||||
QUERY_CHECK_NULL(pkeyInfo, code, lino, _end, terrno);
|
||||
void* tmp = taosArrayPush(tableIdList, &pkeyInfo->uid);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
}
|
||||
|
@ -4120,6 +4127,14 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes,
|
|||
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
||||
|
||||
STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
|
||||
if (!item) {
|
||||
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
|
||||
GET_TASKID(pTaskInfo));
|
||||
tDecoderClear(&(*mr).coder);
|
||||
pAPI->metaReaderFn.clearReader(mr);
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
||||
code = pAPI->metaReaderFn.getTableEntryByUid(mr, item->uid);
|
||||
tDecoderClear(&(*mr).coder);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -4805,18 +4820,23 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
|
||||
static int32_t setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
|
||||
pInfo->bGroupProcessed = false;
|
||||
|
||||
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
||||
int32_t i = pInfo->tableStartIndex + 1;
|
||||
for (; i < numOfTables; ++i) {
|
||||
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
|
||||
if (!tableKeyInfo) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
if (tableKeyInfo->groupId != pInfo->groupId) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
pInfo->tableEndIndex = i - 1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
|
||||
|
@ -4845,7 +4865,11 @@ static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
|
|||
}
|
||||
|
||||
static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
|
||||
setGroupStartEndIndex(pInfo);
|
||||
int32_t code = setGroupStartEndIndex(pInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
STmsSubTablesMergeInfo* pSubTblsInfo = taosMemoryCalloc(1, sizeof(STmsSubTablesMergeInfo));
|
||||
if (pSubTblsInfo == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -4872,7 +4896,7 @@ static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
|
|||
}
|
||||
int32_t bufPageSize = pInfo->bufPageSize;
|
||||
int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize;
|
||||
int32_t code =
|
||||
code =
|
||||
createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFree(pSubTblsInfo->aInputs);
|
||||
|
@ -5150,7 +5174,9 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
return code;
|
||||
}
|
||||
pInfo->tableStartIndex = 0;
|
||||
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
|
||||
STableKeyInfo* pTmpGpId = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
|
||||
QUERY_CHECK_NULL(pTmpGpId, code, lino, _end, terrno);
|
||||
pInfo->groupId = pTmpGpId->groupId;
|
||||
code = startSubTablesTableMergeScan(pOperator);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
@ -5164,6 +5190,7 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
pBlock = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||
if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) {
|
||||
STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
|
||||
QUERY_CHECK_NULL(tbInfo, code, lino, _end, terrno);
|
||||
pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo);
|
||||
}
|
||||
if (pBlock != NULL) {
|
||||
|
@ -5180,7 +5207,10 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
}
|
||||
|
||||
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
|
||||
pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
|
||||
STableKeyInfo* pTmpGpId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
|
||||
QUERY_CHECK_NULL(pTmpGpId, code, lino, _end, terrno);
|
||||
|
||||
pInfo->groupId = pTmpGpId->groupId;
|
||||
code = startSubTablesTableMergeScan(pOperator);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||
|
@ -5510,6 +5540,7 @@ void startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
int32_t i = pInfo->tableStartIndex + 1;
|
||||
for (; i < numOfTables; ++i) {
|
||||
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
|
||||
QUERY_CHECK_NULL(tableKeyInfo, code, lino, _end, terrno);
|
||||
if (tableKeyInfo->groupId != pInfo->groupId) {
|
||||
break;
|
||||
}
|
||||
|
@ -5635,7 +5666,9 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
return code;
|
||||
}
|
||||
pInfo->tableStartIndex = 0;
|
||||
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
|
||||
STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
pInfo->groupId = tmp->groupId;
|
||||
startGroupTableMergeScan(pOperator);
|
||||
}
|
||||
|
||||
|
@ -5649,6 +5682,7 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
pOperator);
|
||||
if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) {
|
||||
STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
|
||||
QUERY_CHECK_NULL(tbInfo, code, lino, _end, terrno);
|
||||
pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo);
|
||||
}
|
||||
if (pBlock != NULL) {
|
||||
|
@ -5670,7 +5704,9 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
}
|
||||
|
||||
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
|
||||
pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
|
||||
STableKeyInfo* tmp = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
pInfo->groupId = tmp->groupId;
|
||||
startGroupTableMergeScan(pOperator);
|
||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue