Merge pull request #28201 from taosdata/fix/3_liaohj

refactor: update the error logs.
This commit is contained in:
Haojun Liao 2024-09-30 13:00:15 +08:00 committed by GitHub
commit 09b729f97d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 110 additions and 71 deletions

View File

@ -70,6 +70,8 @@ typedef struct SActiveCheckpointInfo SActiveCheckpointInfo;
#define SSTREAM_TASK_NEED_CONVERT_VER 2
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
extern int32_t streamMetaId;
enum {
STREAM_STATUS__NORMAL = 0,
STREAM_STATUS__STOP,
@ -135,11 +137,6 @@ enum {
STREAM_QUEUE__PROCESSING,
};
enum {
STREAM_META_WILL_STOP = 1,
STREAM_META_OK_TO_STOP = 2,
};
typedef enum EStreamTaskEvent {
TASK_EVENT_INIT = 0x1,
TASK_EVENT_INIT_SCANHIST = 0x2,
@ -282,7 +279,6 @@ typedef enum {
} EConsenChkptStatus;
typedef struct SConsenChkptInfo {
// bool alreadySendChkptId;
EConsenChkptStatus status;
int64_t statusTs;
int32_t consenChkptTransId;

View File

@ -16,8 +16,13 @@
#include "tq.h"
#include "vnd.h"
#define MAX_REPEAT_SCAN_THRESHOLD 3
#define SCAN_WAL_IDLE_DURATION 100
#define MAX_REPEAT_SCAN_THRESHOLD 3
#define SCAN_WAL_IDLE_DURATION 100
typedef struct SBuildScanWalMsgParam {
int64_t metaId;
int32_t numOfTasks;
} SBuildScanWalMsgParam;
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
@ -31,13 +36,12 @@ int32_t tqScanWal(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
int64_t st = taosGetTimestampMs();
int32_t numOfTasks = 0;
bool shouldIdle = true;
tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter);
// check all tasks
int32_t numOfTasks = 0;
bool shouldIdle = true;
int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle);
if (code) {
tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code));
@ -68,54 +72,61 @@ int32_t tqScanWal(STQ* pTq) {
return code;
}
typedef struct SBuildScanWalMsgParam {
STQ* pTq;
int32_t numOfTasks;
} SBuildScanWalMsgParam;
static void doStartScanWal(void* param, void* tmrId) {
int32_t vgId = 0;
STQ* pTq = NULL;
int32_t code = 0;
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
STQ* pTq = pParam->pTq;
int32_t vgId = pTq->pStreamMeta->vgId;
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, pParam->metaId);
if (pMeta == NULL) {
tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
taosMemoryFree(pParam);
return;
}
vgId = pMeta->vgId;
pTq = pMeta->ahandle;
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
pTq->pVnode->restored);
int32_t code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
taosMemoryFree(pParam);
code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
if (code) {
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
}
code = taosReleaseRef(streamMetaId, pParam->metaId);
if (code) {
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
tstrerror(code));
}
taosMemoryFree(pParam);
}
int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
tmr_h pTimer = NULL;
SBuildScanWalMsgParam* pParam = NULL;
SBuildScanWalMsgParam* pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
if (pParam == NULL) {
return terrno;
}
pParam->pTq = pTq;
pParam->metaId = pMeta->rid;
pParam->numOfTasks = numOfTasks;
tmr_h pTimer = NULL;
code = streamTimerGetInstance(&pTimer);
if (code) {
tqError("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
return code;
}
if (pMeta->scanInfo.scanTimer == NULL) {
pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTimer);
taosMemoryFree(pParam);
} else {
bool ret = taosTmrReset(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer);
if (!ret) {
// tqError("vgId:%d failed to start scan wal in:%dms", vgId, idleDuration);
}
streamTmrStart(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal-fut");
}
return code;
@ -124,8 +135,8 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
bool alreadyRestored = pTq->pVnode->restored;
bool alreadyRestored = pTq->pVnode->restored;
int32_t numOfTasks = 0;
// do not launch the stream tasks, if it is a follower or not restored vnode.
if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) {
@ -134,7 +145,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
streamMetaWLock(pMeta);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
streamMetaWUnLock(pMeta);
@ -378,13 +389,13 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) {
STaskId* pTaskId = taosArrayGet(pTaskList, i);
STaskId* pTaskId = taosArrayGet(pTaskList, i);
if (pTaskId == NULL) {
continue;
}
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
int32_t code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (pTask == NULL || code != 0) {
continue;
}

View File

@ -798,7 +798,6 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId();
int32_t lino = 0;
int64_t curOwner = 0;
*pRes = NULL;
@ -846,7 +845,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes);
if (code) {
pTaskInfo->code = code;
qError("%s failed at line %d, code:%s %s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
}
blockDataCheck(*pRes, false);

View File

@ -1301,10 +1301,17 @@ FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOpera
freeOperatorParam(pOperator->pDownstreamGetParams[idx], OP_GET_PARAM);
pOperator->pDownstreamGetParams[idx] = NULL;
}
if (code) {
qError("failed to get next data block from upstream at %s, line:%d code:%s", __func__, __LINE__, tstrerror(code));
}
return code;
}
code = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx], pResBlock);
if (code) {
qError("failed to get next data block from upstream at %s, %d code:%s", __func__, __LINE__, tstrerror(code));
}
return code;
}

View File

@ -86,11 +86,13 @@ static void destroyGroupOperatorInfo(void* param) {
taosArrayDestroy(pInfo->pGroupCols);
taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey);
cleanupExprSupp(&pInfo->scalarSup);
if (pInfo->pOperator) {
if (pInfo->pOperator != NULL) {
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
pInfo->pOperator = NULL;
}
cleanupGroupResInfo(&pInfo->groupResInfo);
cleanupAggSup(&pInfo->aggSup);
taosMemoryFreeClear(param);

View File

@ -67,6 +67,9 @@ int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
blockDataCheck(*ppBlock, false);
if (code) {
qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
}
return code;
}

View File

@ -1380,8 +1380,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
if (code != TSDB_CODE_SUCCESS) {
taosRUnLockLatch(&pTaskInfo->lock);
lino = __LINE__;
goto _end;
TSDB_CHECK_CODE(code, lino, _end);
}
if (pInfo->currentTable >= numOfTables) {
@ -1393,11 +1392,11 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
if (!tmp) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
taosRUnLockLatch(&pTaskInfo->lock);
(*ppRes) = NULL;
return terrno;
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
}
tInfo = *tmp;
taosRUnLockLatch(&pTaskInfo->lock);
@ -1412,11 +1411,12 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
}
} else { // scan table group by group sequentially
code = groupSeqTableScan(pOperator, ppRes);
QUERY_CHECK_CODE(code, lino, _end);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
@ -5834,9 +5834,10 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STupleHandle* pTupleHandle = NULL;
blockDataCleanup(pResBlock);
STupleHandle* pTupleHandle = NULL;
while (1) {
while (1) {
pTupleHandle = NULL;

View File

@ -204,15 +204,18 @@ int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle)
* @brief get next tuple with group id attached, here assume that all tuples are sorted by group keys
* @param [in, out] pBlock the output block, the group id will be saved in it
* @retval NULL if next group tuple arrived and this new group tuple will be saved in pInfo.pSavedTuple
* @retval NULL if no more tuples
*/
static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock) {
int32_t code = 0;
static int32_t nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock,
STupleHandle** pTupleHandle) {
QRY_PARAM_CHECK(pTupleHandle);
int32_t code = 0;
STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple;
if (!retTuple) {
code = tsortNextTuple(pHandle, &retTuple);
if (code) {
return NULL;
qError("failed to get next tuple, code:%s", tstrerror(code));
return code;
}
}
@ -225,7 +228,8 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf
newGroup = tsortCompAndBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->keyBuf,
&pInfo->pGroupIdCalc->lastKeysLen, retTuple);
}
bool emptyBlock = pBlock->info.rows == 0;
bool emptyBlock = (pBlock->info.rows == 0);
if (newGroup) {
if (!emptyBlock) {
// new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return
@ -247,17 +251,20 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf
}
}
return retTuple;
*pTupleHandle = retTuple;
return code;
}
static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
QRY_PARAM_CHECK(pResBlock);
blockDataCleanup(pDataBlock);
int32_t lino = 0;
int32_t code = 0;
SSDataBlock* p = NULL;
int32_t lino = 0;
int32_t code = 0;
STupleHandle* pTupleHandle = NULL;
SSDataBlock* p = NULL;
code = tsortGetSortedDataBlock(pHandle, &p);
if (p == NULL || (code != 0)) {
return code;
@ -266,16 +273,15 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
code = blockDataEnsureCapacity(p, capacity);
QUERY_CHECK_CODE(code, lino, _error);
STupleHandle* pTupleHandle;
while (1) {
if (pInfo->pGroupIdCalc) {
pTupleHandle = nextTupleWithGroupId(pHandle, pInfo, p);
code = nextTupleWithGroupId(pHandle, pInfo, p, &pTupleHandle);
} else {
code = tsortNextTuple(pHandle, &pTupleHandle);
}
if (pTupleHandle == NULL || code != 0) {
lino = __LINE__;
TSDB_CHECK_CODE(code, lino, _error);
if (pTupleHandle == NULL) {
break;
}
@ -320,7 +326,7 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
return code;
_error:
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
blockDataDestroy(p);
return code;
@ -330,6 +336,9 @@ int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
blockDataCheck(*ppBlock, false);
if (code) {
qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
}
return code;
}

View File

@ -1229,11 +1229,13 @@ static void destroyStateWindowOperatorInfo(void* param) {
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(pInfo->stateKey.pData);
if (pInfo->pOperator) {
if (pInfo->pOperator != NULL) {
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
pInfo->pOperator = NULL;
}
cleanupExprSupp(&pInfo->scalarSup);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupAggSup(&pInfo->aggSup);
@ -1251,13 +1253,17 @@ void destroyIntervalOperatorInfo(void* param) {
if (param == NULL) {
return;
}
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
if (pInfo->pOperator) {
if (pInfo->pOperator != NULL) {
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
pInfo->pOperator = NULL;
}
cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSupp);
@ -1265,6 +1271,7 @@ void destroyIntervalOperatorInfo(void* param) {
taosArrayDestroy(pInfo->pInterpCols);
pInfo->pInterpCols = NULL;
taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
pInfo->pPrevValues = NULL;
@ -1358,6 +1365,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
code = terrno;
lino = __LINE__;
goto _error;
}
@ -1465,8 +1473,10 @@ _error:
if (pInfo != NULL) {
destroyIntervalOperatorInfo(pInfo);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
return code;
}
@ -1754,11 +1764,13 @@ void destroySWindowOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
if (pInfo->pOperator) {
if (pInfo->pOperator != NULL) {
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
pInfo->pOperator = NULL;
}
cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSupp);

View File

@ -771,7 +771,7 @@ static int32_t getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam*
code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
if (code != TSDB_CODE_SUCCESS) {
return terrno = code;
return code;
}
if (pHandle->pDataBlock->info.rows >= capacity) {
@ -2869,6 +2869,7 @@ static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle
pHandle->tupleHandle.pBlock = NULL;
return code;
}
pHandle->tupleHandle.pBlock = pBlock;
pHandle->tupleHandle.rowIndex = 0;
}
@ -2884,8 +2885,7 @@ int32_t tsortOpen(SSortHandle* pHandle) {
}
if (pHandle == NULL || pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
code = TSDB_CODE_INVALID_PARA;
return code;
return TSDB_CODE_INVALID_PARA;
}
pHandle->opened = true;

View File

@ -164,7 +164,6 @@ extern void* streamTimer;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
extern int32_t taskDbWrapperId;
extern int32_t streamMetaId;
int32_t streamTimerInit();
void streamTimerCleanUp();