refactor: update the error logs.

This commit is contained in:
Haojun Liao 2024-09-24 18:19:47 +08:00
parent ece81177fa
commit 2b78b660bd
7 changed files with 45 additions and 29 deletions

View File

@ -798,7 +798,6 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId();
int32_t lino = 0;
int64_t curOwner = 0;
*pRes = NULL;
@ -846,7 +845,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes);
if (code) {
pTaskInfo->code = code;
qError("%s failed at line %d, code:%s %s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
}
blockDataCheck(*pRes, false);

View File

@ -1297,10 +1297,17 @@ FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOpera
freeOperatorParam(pOperator->pDownstreamGetParams[idx], OP_GET_PARAM);
pOperator->pDownstreamGetParams[idx] = NULL;
}
if (code) {
qError("failed to get next data block from upstream at %s, line:%d code:%s", __func__, __LINE__, tstrerror(code));
}
return code;
}
code = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx], pResBlock);
if (code) {
qError("failed to get next data block from upstream at %s, %d code:%s", __func__, __LINE__, tstrerror(code));
}
return code;
}

View File

@ -67,6 +67,9 @@ int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
blockDataCheck(*ppBlock, false);
if (code) {
qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
}
return code;
}
@ -518,6 +521,7 @@ int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
if (NULL != gMultiwayMergeFps[pInfo->type].getNextFn) {
code = (*gMultiwayMergeFps[pInfo->type].getNextFn)(pOperator, pResBlock);
if (code) {
qError("failed to get next data block from upstream, code:%s", tstrerror(code));
pTaskInfo->code = code;
return code;
}

View File

@ -889,6 +889,7 @@ int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam*
} else {
code = pOperator->fpSet.getNextFn(pOperator, pRes);
if (code) {
qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
pOperator->pTaskInfo->code = code;
}
}

View File

@ -1378,8 +1378,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
if (code != TSDB_CODE_SUCCESS) {
taosRUnLockLatch(&pTaskInfo->lock);
lino = __LINE__;
goto _end;
TSDB_CHECK_CODE(code, lino, _end);
}
if (pInfo->currentTable >= numOfTables) {
@ -1391,11 +1390,11 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
if (!tmp) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
taosRUnLockLatch(&pTaskInfo->lock);
(*ppRes) = NULL;
return terrno;
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
}
tInfo = *tmp;
taosRUnLockLatch(&pTaskInfo->lock);
@ -1410,11 +1409,12 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
}
} else { // scan table group by group sequentially
code = groupSeqTableScan(pOperator, ppRes);
QUERY_CHECK_CODE(code, lino, _end);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
pTaskInfo->code = code;
}
@ -5820,9 +5820,10 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STupleHandle* pTupleHandle = NULL;
blockDataCleanup(pResBlock);
STupleHandle* pTupleHandle = NULL;
while (1) {
while (1) {
pTupleHandle = NULL;

View File

@ -204,16 +204,17 @@ int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle)
* @brief get next tuple with group id attached, here assume that all tuples are sorted by group keys
* @param [in, out] pBlock the output block, the group id will be saved in it
* @retval NULL if next group tuple arrived and this new group tuple will be saved in pInfo.pSavedTuple
* @retval NULL if no more tuples
*/
static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock) {
int32_t code = 0;
static int32_t nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock,
STupleHandle** pTupleHandle) {
QRY_PARAM_CHECK(pTupleHandle);
int32_t code = 0;
STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple;
if (!retTuple) {
code = tsortNextTuple(pHandle, &retTuple);
if (code) {
return NULL;
}
qError("failed to get next tuple, code:%s", tstrerror(code));
return code;
}
if (retTuple) {
@ -225,7 +226,8 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf
newGroup = tsortCompAndBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->keyBuf,
&pInfo->pGroupIdCalc->lastKeysLen, retTuple);
}
bool emptyBlock = pBlock->info.rows == 0;
bool emptyBlock = (pBlock->info.rows == 0);
if (newGroup) {
if (!emptyBlock) {
// new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return
@ -247,17 +249,20 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf
}
}
return retTuple;
*pTupleHandle = retTuple;
return code;
}
static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
QRY_PARAM_CHECK(pResBlock);
blockDataCleanup(pDataBlock);
int32_t lino = 0;
int32_t code = 0;
SSDataBlock* p = NULL;
int32_t lino = 0;
int32_t code = 0;
STupleHandle* pTupleHandle = NULL;
SSDataBlock* p = NULL;
code = tsortGetSortedDataBlock(pHandle, &p);
if (p == NULL || (code != 0)) {
return code;
@ -266,18 +271,14 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
code = blockDataEnsureCapacity(p, capacity);
QUERY_CHECK_CODE(code, lino, _error);
STupleHandle* pTupleHandle;
while (1) {
if (pInfo->pGroupIdCalc) {
pTupleHandle = nextTupleWithGroupId(pHandle, pInfo, p);
code = nextTupleWithGroupId(pHandle, pInfo, p, &pTupleHandle);
} else {
code = tsortNextTuple(pHandle, &pTupleHandle);
}
if (pTupleHandle == NULL || code != 0) {
lino = __LINE__;
break;
}
TSDB_CHECK_CODE(code, lino, _error);
code = appendOneRowToDataBlock(p, pTupleHandle);
QUERY_CHECK_CODE(code, lino, _error);
@ -320,7 +321,7 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
return code;
_error:
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
blockDataDestroy(p);
return code;
@ -330,6 +331,9 @@ int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
blockDataCheck(*ppBlock, false);
if (code) {
qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
}
return code;
}

View File

@ -771,7 +771,7 @@ static int32_t getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam*
code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
if (code != TSDB_CODE_SUCCESS) {
return terrno = code;
return code;
}
if (pHandle->pDataBlock->info.rows >= capacity) {
@ -2867,6 +2867,7 @@ static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle
pHandle->tupleHandle.pBlock = NULL;
return code;
}
pHandle->tupleHandle.pBlock = pBlock;
pHandle->tupleHandle.rowIndex = 0;
}
@ -2882,8 +2883,7 @@ int32_t tsortOpen(SSortHandle* pHandle) {
}
if (pHandle == NULL || pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
code = TSDB_CODE_INVALID_PARA;
return code;
return TSDB_CODE_INVALID_PARA;
}
pHandle->opened = true;