ehn(query): remove query executor assert

This commit is contained in:
54liuyao 2024-08-23 11:01:45 +08:00
parent 2085ea9375
commit de4f09b8fd
12 changed files with 142 additions and 60 deletions

View File

@ -126,7 +126,7 @@ uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint6
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid); int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo,
int32_t* num); int32_t* num);
uint64_t tableListGetSize(const STableListInfo* pTableList); int32_t tableListGetSize(const STableListInfo* pTableList, int32_t* pRes);
uint64_t tableListGetSuid(const STableListInfo* pTableList); uint64_t tableListGetSuid(const STableListInfo* pTableList);
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index); STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index);
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex); int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex);

View File

@ -176,7 +176,10 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds, &pInfo->pDstSlotIds); code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds, &pInfo->pDstSlotIds);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
int32_t totalTables = tableListGetSize(pTableListInfo); int32_t totalTables = 0;
code = tableListGetSize(pTableListInfo, &totalTables);
QUERY_CHECK_CODE(code, lino, _error);
int32_t capacity = 0; int32_t capacity = 0;
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
@ -271,7 +274,10 @@ int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SSDataBlock* pBufRes = pInfo->pBufferedRes; SSDataBlock* pBufRes = pInfo->pBufferedRes;
uint64_t suid = tableListGetSuid(pTableList); uint64_t suid = tableListGetSuid(pTableList);
int32_t size = tableListGetSize(pTableList); int32_t size = 0;
code = tableListGetSize(pTableList, &size);
QUERY_CHECK_CODE(code, lino, _end);
if (size == 0) { if (size == 0) {
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
(*ppRes) = NULL; (*ppRes) = NULL;

View File

@ -894,12 +894,12 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
int32_t rawLen = *(int32_t*)pStart; int32_t rawLen = *(int32_t*)pStart;
pStart += sizeof(int32_t); pStart += sizeof(int32_t);
ASSERT(compLen <= rawLen && compLen != 0); QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
pNextStart = pStart + compLen; pNextStart = pStart + compLen;
if (pRetrieveRsp->compressed && (compLen < rawLen)) { if (pRetrieveRsp->compressed && (compLen < rawLen)) {
int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0); int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
ASSERT(t == rawLen); QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
pStart = pDataInfo->decompBuf; pStart = pDataInfo->decompBuf;
} }

View File

@ -197,7 +197,6 @@ void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL
pGroupResInfo->freeItem = true; pGroupResInfo->freeItem = true;
pGroupResInfo->pRows = pArrayList; pGroupResInfo->pRows = pArrayList;
pGroupResInfo->index = 0; pGroupResInfo->index = 0;
ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
} }
bool hasRemainResults(SGroupResInfo* pGroupResInfo) { bool hasRemainResults(SGroupResInfo* pGroupResInfo) {
@ -1560,7 +1559,11 @@ int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode,
return code; return code;
} }
ASSERT(nodeType(pNew) == QUERY_NODE_VALUE); if (nodeType(pNew) != QUERY_NODE_VALUE) {
nodesDestroyList(groupNew);
pAPI->metaReaderFn.clearReader(&mr);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
SValueNode* pValue = (SValueNode*)pNew; SValueNode* pValue = (SValueNode*)pNew;
if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) { if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
@ -1879,7 +1882,8 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName); pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
pExp->pExpr->_optrRoot.pRootNode = pNode; pExp->pExpr->_optrRoot.pRootNode = pNode;
} else { } else {
ASSERT(0); code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
} }
_end: _end:
@ -2149,7 +2153,7 @@ int32_t relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SAr
} else if (p->info.colId < pmInfo->colId) { } else if (p->info.colId < pmInfo->colId) {
i++; i++;
} else { } else {
ASSERT(0); return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
} }
} }
return code; return code;
@ -2383,9 +2387,13 @@ void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo) {
pLimitInfo->remainOffset = pLimitInfo->limit.offset; pLimitInfo->remainOffset = pLimitInfo->limit.offset;
} }
uint64_t tableListGetSize(const STableListInfo* pTableList) { int32_t tableListGetSize(const STableListInfo* pTableList, int32_t* pRes) {
ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map)); if (taosArrayGetSize(pTableList->pTableList) != taosHashGetSize(pTableList->map)) {
return taosArrayGetSize(pTableList->pTableList); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
(*pRes) = taosArrayGetSize(pTableList->pTableList);
return TSDB_CODE_SUCCESS;
} }
uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->idInfo.suid; } uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->idInfo.suid; }
@ -2430,7 +2438,6 @@ uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tab
} }
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot); STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
ASSERT(pKeyInfo->uid == tableUid);
return pKeyInfo->groupId; return pKeyInfo->groupId;
} }
@ -2457,7 +2464,8 @@ int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t
int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1; int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
code = taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot)); code = taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
ASSERT(code != TSDB_CODE_DUP_KEY); // we have checked the existence of uid in hash map above // we have checked the existence of uid in hash map above
QUERY_CHECK_CONDITION((code != TSDB_CODE_DUP_KEY), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
taosArrayPopTailBatch(pTableList->pTableList, 1); // let's pop the last element in the array list taosArrayPopTailBatch(pTableList->pTableList, 1); // let's pop the last element in the array list
} }
@ -2474,7 +2482,12 @@ _end:
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
int32_t* size) { int32_t* size) {
int32_t totalGroups = tableListGetOutputGroups(pTableList); int32_t totalGroups = tableListGetOutputGroups(pTableList);
int32_t numOfTables = tableListGetSize(pTableList); int32_t numOfTables = 0;
int32_t code = tableListGetSize(pTableList, &numOfTables);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return code;
}
if (ordinalGroupIndex < 0 || ordinalGroupIndex >= totalGroups) { if (ordinalGroupIndex < 0 || ordinalGroupIndex >= totalGroups) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;

View File

@ -1295,7 +1295,13 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
// this value may be changed if new tables are created // this value may be changed if new tables are created
taosRLockLatch(&pTaskInfo->lock); taosRLockLatch(&pTaskInfo->lock);
int32_t numOfTables = tableListGetSize(pTableListInfo); int32_t numOfTables = 0;
code = tableListGetSize(pTableListInfo, &numOfTables);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
taosRUnLockLatch(&pTaskInfo->lock);
return code;
}
if (uid == 0) { if (uid == 0) {
if (numOfTables != 0) { if (numOfTables != 0) {
@ -1439,7 +1445,13 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
tDeleteSchemaWrapper(mtInfo.schema); tDeleteSchemaWrapper(mtInfo.schema);
return code; return code;
} }
int32_t size = tableListGetSize(pTableListInfo); int32_t size = 0;
code = tableListGetSize(pTableListInfo, &size);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
tDeleteSchemaWrapper(mtInfo.schema);
return code;
}
code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size,
NULL, (void**)&pInfo->dataReader, NULL, NULL); NULL, (void**)&pInfo->dataReader, NULL, NULL);
@ -1520,7 +1532,10 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
SArray* pUidList = taosArrayInit(10, sizeof(uint64_t)); SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));
QUERY_CHECK_NULL(pUidList, code, lino, _end, terrno); QUERY_CHECK_NULL(pUidList, code, lino, _end, terrno);
int32_t numOfTables = tableListGetSize(pTableListInfo); int32_t numOfTables = 0;
code = tableListGetSize(pTableListInfo, &numOfTables);
QUERY_CHECK_CODE(code, lino, _end);
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i); STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno); QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno);

View File

@ -1115,7 +1115,14 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo*
pDeleterParam->suid = tableListGetSuid(pTableListInfo); pDeleterParam->suid = tableListGetSuid(pTableListInfo);
// TODO extract uid list // TODO extract uid list
int32_t numOfTables = tableListGetSize(pTableListInfo); int32_t numOfTables = 0;
code = tableListGetSize(pTableListInfo, &numOfTables);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
taosMemoryFree(pDeleterParam);
return code;
}
pDeleterParam->pUidList = taosArrayInit(numOfTables, sizeof(uint64_t)); pDeleterParam->pUidList = taosArrayInit(numOfTables, sizeof(uint64_t));
if (NULL == pDeleterParam->pUidList) { if (NULL == pDeleterParam->pUidList) {
taosMemoryFree(pDeleterParam); taosMemoryFree(pDeleterParam);

View File

@ -213,7 +213,6 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
memcpy(pkey->pData, val, dataLen); memcpy(pkey->pData, val, dataLen);
} else if (IS_VAR_DATA_TYPE(pkey->type)) { } else if (IS_VAR_DATA_TYPE(pkey->type)) {
memcpy(pkey->pData, val, varDataTLen(val)); memcpy(pkey->pData, val, varDataTLen(val));
ASSERT(varDataTLen(val) <= pkey->bytes);
} else { } else {
memcpy(pkey->pData, val, pkey->bytes); memcpy(pkey->pData, val, pkey->bytes);
} }
@ -241,7 +240,6 @@ static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
} else if (IS_VAR_DATA_TYPE(pkey->type)) { } else if (IS_VAR_DATA_TYPE(pkey->type)) {
varDataCopy(pStart, pkey->pData); varDataCopy(pStart, pkey->pData);
pStart += varDataTLen(pkey->pData); pStart += varDataTLen(pkey->pData);
ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
} else { } else {
memcpy(pStart, pkey->pData, pkey->bytes); memcpy(pStart, pkey->pData, pkey->bytes);
pStart += pkey->bytes; pStart += pkey->bytes;
@ -740,7 +738,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
memcpy(data + (*columnLen), src, dataLen); memcpy(data + (*columnLen), src, dataLen);
int32_t v = (data + (*columnLen) + dataLen - (char*)pPage); int32_t v = (data + (*columnLen) + dataLen - (char*)pPage);
ASSERT(v > 0); QUERY_CHECK_CONDITION((v > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
contentLen = dataLen; contentLen = dataLen;
} else { } else {
@ -748,7 +746,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
char* src = colDataGetData(pColInfoData, j); char* src = colDataGetData(pColInfoData, j);
memcpy(data + (*columnLen), src, varDataTLen(src)); memcpy(data + (*columnLen), src, varDataTLen(src));
int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage); int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
ASSERT(v > 0); QUERY_CHECK_CONDITION((v > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
contentLen = varDataTLen(src); contentLen = varDataTLen(src);
} }
@ -762,7 +760,8 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
colDataSetNull_f(bitmap, (*rows)); colDataSetNull_f(bitmap, (*rows));
} else { } else {
memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes); memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf)); QUERY_CHECK_CONDITION(((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf)), code,
lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
} }
contentLen = bytes; contentLen = bytes;
} }
@ -1299,7 +1298,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
SStreamPartitionOperatorInfo* pInfo = pOperator->info; SStreamPartitionOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pDest = pInfo->binfo.pRes; SSDataBlock* pDest = pInfo->binfo.pRes;
ASSERT(hasRemainPartion(pInfo)); QUERY_CHECK_CONDITION((hasRemainPartion(pInfo)), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->parIte; SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->parIte;
blockDataCleanup(pDest); blockDataCleanup(pDest);
int32_t rows = taosArrayGetSize(pParInfo->rowIds); int32_t rows = taosArrayGetSize(pParInfo->rowIds);
@ -1343,7 +1342,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
pDest->info.id.groupId = pParInfo->groupId; pDest->info.id.groupId = pParInfo->groupId;
pOperator->resultInfo.totalRows += pDest->info.rows; pOperator->resultInfo.totalRows += pDest->info.rows;
pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte); pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte);
ASSERT(pDest->info.rows > 0); QUERY_CHECK_CONDITION((pDest->info.rows > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
_end: _end:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -1549,7 +1548,8 @@ static int32_t doStreamHashPartitionNext(SOperatorInfo* pOperator, SSDataBlock**
return code; return code;
} }
default: default:
ASSERTS(0, "invalid SSDataBlock type"); code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
} }
// there is an scalar expression that needs to be calculated right before apply the group aggregation. // there is an scalar expression that needs to be calculated right before apply the group aggregation.

View File

@ -1082,7 +1082,10 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
return getBlockForEmptyTable(pOperator, pStart); return getBlockForEmptyTable(pOperator, pStart);
} }
} else { // group by tag + no sort } else { // group by tag + no sort
int32_t numOfTables = tableListGetSize(pTableListInfo); int32_t numOfTables = 0;
code = tableListGetSize(pTableListInfo, &numOfTables);
QUERY_CHECK_CODE(code, lino, _end);
if (pTableScanInfo->tableEndIndex + 1 >= numOfTables) { if (pTableScanInfo->tableEndIndex + 1 >= numOfTables) {
// get empty group, mark processed & rm from hash // get empty group, mark processed & rm from hash
void* pIte = taosHashIterate(pTableListInfo->remainGroups, NULL); void* pIte = taosHashIterate(pTableListInfo->remainGroups, NULL);
@ -1171,7 +1174,10 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
STableScanInfo* pInfo = pOperator->info; STableScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); int32_t numOfTables = 0;
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
QUERY_CHECK_CODE(code, lino, _end);
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
if (pOperator->dynamicTask) { if (pOperator->dynamicTask) {
@ -1304,7 +1310,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
// scan table one by one sequentially // scan table one by one sequentially
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
int32_t numOfTables = 0; // tableListGetSize(pTaskInfo->pTableListInfo); int32_t numOfTables = 0;
STableKeyInfo tInfo = {0}; STableKeyInfo tInfo = {0};
pInfo->countState = TABLE_COUNT_STATE_END; pInfo->countState = TABLE_COUNT_STATE_END;
@ -1319,7 +1325,13 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
pInfo->currentTable++; pInfo->currentTable++;
taosRLockLatch(&pTaskInfo->lock); taosRLockLatch(&pTaskInfo->lock);
numOfTables = tableListGetSize(pInfo->base.pTableListInfo); numOfTables = 0;
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
if (code != TSDB_CODE_SUCCESS) {
taosRUnLockLatch(&pTaskInfo->lock);
lino = __LINE__;
goto _end;
}
if (pInfo->currentTable >= numOfTables) { if (pInfo->currentTable >= numOfTables) {
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo)); qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
@ -3612,7 +3624,9 @@ static int32_t extractTableIdList(const STableListInfo* pTableListInfo, SArray**
QUERY_CHECK_NULL(tableIdList, code, lino, _end, terrno); QUERY_CHECK_NULL(tableIdList, code, lino, _end, terrno);
// Transfer the Array of STableKeyInfo into uid list. // Transfer the Array of STableKeyInfo into uid list.
size_t size = tableListGetSize(pTableListInfo); int32_t size = 0;
code = tableListGetSize(pTableListInfo, &size);
QUERY_CHECK_CODE(code, lino, _end);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i); STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
QUERY_CHECK_NULL(pkeyInfo, code, lino, _end, terrno); QUERY_CHECK_NULL(pkeyInfo, code, lino, _end, terrno);
@ -4628,7 +4642,13 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes); blockDataCleanup(pRes);
int32_t size = tableListGetSize(pInfo->pTableListInfo); int32_t size = 0;
code = tableListGetSize(pInfo->pTableListInfo, &size);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return code;
}
if (size == 0) { if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED); setTaskStatus(pTaskInfo, TASK_COMPLETED);
(*ppRes) = NULL; (*ppRes) = NULL;
@ -4926,7 +4946,13 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
static int32_t setGroupStartEndIndex(STableMergeScanInfo* pInfo) { static int32_t setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
pInfo->bGroupProcessed = false; pInfo->bGroupProcessed = false;
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); int32_t numOfTables = 0;
int32_t code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return code;
}
int32_t i = pInfo->tableStartIndex + 1; int32_t i = pInfo->tableStartIndex + 1;
for (; i < numOfTables; ++i) { for (; i < numOfTables; ++i) {
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i); STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
@ -5267,7 +5293,10 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo); int32_t tableListSize = 0;
code = tableListGetSize(pInfo->base.pTableListInfo, &tableListSize);
QUERY_CHECK_CODE(code, lino, _end);
if (!pInfo->hasGroupId) { if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true; pInfo->hasGroupId = true;
@ -5644,7 +5673,10 @@ void startGroupTableMergeScan(SOperatorInfo* pOperator) {
qDebug("%s table merge scan start group %" PRIu64, GET_TASKID(pTaskInfo), pInfo->groupId); qDebug("%s table merge scan start group %" PRIu64, GET_TASKID(pTaskInfo), pInfo->groupId);
{ {
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); int32_t numOfTables = 0;
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
QUERY_CHECK_CODE(code, lino, _end);
int32_t i = pInfo->tableStartIndex + 1; int32_t i = pInfo->tableStartIndex + 1;
for (; i < numOfTables; ++i) { for (; i < numOfTables; ++i) {
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i); STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
@ -5764,7 +5796,10 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo); int32_t tableListSize = 0;
code = tableListGetSize(pInfo->base.pTableListInfo, &tableListSize);
QUERY_CHECK_CODE(code, lino, _end);
if (!pInfo->hasGroupId) { if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true; pInfo->hasGroupId = true;

View File

@ -2843,7 +2843,10 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
pInfo->pTableListInfo = pTableListInfo; pInfo->pTableListInfo = pTableListInfo;
size_t num = tableListGetSize(pTableListInfo); int32_t num = 0;
code = tableListGetSize(pTableListInfo, &num);
QUERY_CHECK_CODE(code, lino, _error);
void* pList = tableListGetInfo(pTableListInfo, 0); void* pList = tableListGetInfo(pTableListInfo, 0);
code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock,

View File

@ -348,7 +348,7 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SRowVa
saveColData(pRowVal->pRowVal, i, p, reset ? true : isNull); saveColData(pRowVal->pRowVal, i, p, reset ? true : isNull);
} else { } else {
ASSERT(0); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
} }
} }
} }
@ -362,10 +362,6 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
bool ascFill = FILL_IS_ASC_FILL(pFillInfo); bool ascFill = FILL_IS_ASC_FILL(pFillInfo);
#if 0
ASSERT(ascFill && (pFillInfo->currentKey >= pFillInfo->start) || (!ascFill && (pFillInfo->currentKey <= pFillInfo->start)));
#endif
while (pFillInfo->numOfCurrent < outputRows) { while (pFillInfo->numOfCurrent < outputRows) {
int64_t ts = ((int64_t*)pTsCol->pData)[pFillInfo->index]; int64_t ts = ((int64_t*)pTsCol->pData)[pFillInfo->index];
@ -392,7 +388,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
goto _end; goto _end;
} }
} else { } else {
ASSERT(pFillInfo->currentKey == ts); QUERY_CHECK_CONDITION((pFillInfo->currentKey == ts), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
int32_t index = pBlock->info.rows; int32_t index = pBlock->info.rows;
int32_t nextRowIndex = pFillInfo->index + 1; int32_t nextRowIndex = pFillInfo->index + 1;
@ -500,7 +496,9 @@ static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bo
} }
} }
static void appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int64_t resultCapacity) { static int32_t appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int64_t resultCapacity) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
/* /*
* These data are generated according to fill strategy, since the current timestamp is out of the time window of * These data are generated according to fill strategy, since the current timestamp is out of the time window of
* real result set. Note that we need to keep the direct previous result rows, to generated the filled data. * real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
@ -512,7 +510,14 @@ static void appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int64_
pFillInfo->numOfTotal += pFillInfo->numOfCurrent; pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
ASSERT(pFillInfo->numOfCurrent == resultCapacity); QUERY_CHECK_CONDITION((pFillInfo->numOfCurrent == resultCapacity), code, lino, _end,
TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
} }
static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
@ -635,15 +640,6 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
// the endKey is now the aligned time window value. truncate time window isn't correct. // the endKey is now the aligned time window value. truncate time window isn't correct.
pFillInfo->end = endKey; pFillInfo->end = endKey;
#if 0
if (pFillInfo->order == TSDB_ORDER_ASC) {
ASSERT(pFillInfo->start <= pFillInfo->end);
} else {
ASSERT(pFillInfo->start >= pFillInfo->end);
}
#endif
pFillInfo->index = 0; pFillInfo->index = 0;
pFillInfo->numOfRows = numOfRows; pFillInfo->numOfRows = numOfRows;
} }
@ -687,7 +683,6 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
numOfRes = numOfRes =
taosTimeCountIntervalForFill(lastKey, pFillInfo->currentKey, pFillInfo->interval.sliding, taosTimeCountIntervalForFill(lastKey, pFillInfo->currentKey, pFillInfo->interval.sliding,
pFillInfo->interval.slidingUnit, pFillInfo->interval.precision, pFillInfo->order); pFillInfo->interval.slidingUnit, pFillInfo->interval.precision, pFillInfo->order);
ASSERT(numOfRes >= numOfRows);
} else { // reach the end of data } else { // reach the end of data
if ((ekey1 < pFillInfo->currentKey && FILL_IS_ASC_FILL(pFillInfo)) || if ((ekey1 < pFillInfo->currentKey && FILL_IS_ASC_FILL(pFillInfo)) ||
(ekey1 > pFillInfo->currentKey && !FILL_IS_ASC_FILL(pFillInfo))) { (ekey1 > pFillInfo->currentKey && !FILL_IS_ASC_FILL(pFillInfo))) {
@ -719,23 +714,30 @@ void taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* po
int32_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity) { int32_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t remain = taosNumOfRemainRows(pFillInfo); int32_t remain = taosNumOfRemainRows(pFillInfo);
int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity); int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity);
ASSERT(numOfRes <= capacity); QUERY_CHECK_CONDITION((numOfRes <= capacity), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
// no data existed for fill operation now, append result according to the fill strategy // no data existed for fill operation now, append result according to the fill strategy
if (remain == 0) { if (remain == 0) {
appendFilledResult(pFillInfo, p, numOfRes); code = appendFilledResult(pFillInfo, p, numOfRes);
QUERY_CHECK_CODE(code, lino, _end);
} else { } else {
code = fillResultImpl(pFillInfo, p, (int32_t)numOfRes); code = fillResultImpl(pFillInfo, p, (int32_t)numOfRes);
ASSERT(numOfRes == pFillInfo->numOfCurrent); QUERY_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CONDITION((numOfRes == pFillInfo->numOfCurrent), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
} }
qDebug("fill:%p, generated fill result, src block:%d, index:%d, brange:%" PRId64 "-%" PRId64 ", currentKey:%" PRId64 qDebug("fill:%p, generated fill result, src block:%d, index:%d, brange:%" PRId64 "-%" PRId64 ", currentKey:%" PRId64
", current : % d, total : % d, %s", ", current : % d, total : % d, %s",
pFillInfo, pFillInfo->numOfRows, pFillInfo->index, pFillInfo->start, pFillInfo->end, pFillInfo->currentKey, pFillInfo, pFillInfo->numOfRows, pFillInfo->index, pFillInfo->start, pFillInfo->end, pFillInfo->currentKey,
pFillInfo->numOfCurrent, pFillInfo->numOfTotal, pFillInfo->id); pFillInfo->numOfCurrent, pFillInfo->numOfTotal, pFillInfo->id);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code; return code;
} }

View File

@ -79,6 +79,7 @@ _error:
int32_t tBloomFilterPutHash(SBloomFilter* pBF, uint64_t hash1, uint64_t hash2) { int32_t tBloomFilterPutHash(SBloomFilter* pBF, uint64_t hash1, uint64_t hash2) {
if (tBloomFilterIsFull(pBF)) { if (tBloomFilterIsFull(pBF)) {
uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_INVALID_PARA));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
bool hasChange = false; bool hasChange = false;

View File

@ -118,7 +118,7 @@ int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len, int3
} }
SBloomFilter* pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1); SBloomFilter* pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
ASSERT(pNormalBf); QUERY_CHECK_NULL(pNormalBf, code, lino, _end, terrno);
if (tBloomFilterIsFull(pNormalBf)) { if (tBloomFilterIsFull(pNormalBf)) {
code = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth, code = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf); pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf);