refactor: pass error code to invoker.

This commit is contained in:
Haojun Liao 2024-09-13 19:35:02 +08:00
parent 83d531284b
commit a33015e712
7 changed files with 35 additions and 20 deletions

View File

@ -976,6 +976,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_STREAM_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x4105)
#define TSDB_CODE_STREAM_CONFLICT_EVENT TAOS_DEF_ERROR_CODE(0, 0x4106)
#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4107)
#define TSDB_CODE_STREAM_INPUTQ_FULL TAOS_DEF_ERROR_CODE(0, 0x4108)
// TDLite
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)

View File

@ -1007,9 +1007,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
if (code) {
tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
}
// let's continue scan data in the wal files
if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) {
if (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK) {
code = tqScanWalAsync(pTq, false); // it's ok to failed
if (code) {
tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));

View File

@ -351,11 +351,8 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then
// retry
code = TSDB_CODE_OUT_OF_MEMORY;
terrno = code;
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
return code;
return terrno;
}
(void)memcpy(data, pBody, len);

View File

@ -23,7 +23,7 @@ static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
static bool taskReadyForDataFromWal(SStreamTask* pTask);
static bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems);
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration);
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
@ -40,7 +40,7 @@ int32_t tqScanWal(STQ* pTq) {
int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle);
if (code) {
tqError("vgId:%d failed to start all tasks, try next time", vgId);
tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code));
return code;
}
@ -293,9 +293,11 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) {
return true;
}
bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) {
int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc) {
const char* id = pTask->id.idStr;
int32_t numOfNewItems = 0;
int32_t code = 0;
*pSucc = false;
while (1) {
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
@ -304,7 +306,7 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems
}
SStreamQueueItem* pItem = NULL;
int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
if (code != TSDB_CODE_SUCCESS || pItem == NULL) { // failed, continue
int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
bool itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
@ -327,10 +329,17 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems
break;
}
} else {
tqTrace("s-task:%s append input queue failed, code:too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer);
if (code == TSDB_CODE_OUT_OF_MEMORY) {
tqError("s-task:%s failed to put data into inputQ, since out of memory");
} else {
tqTrace("s-task:%s append input queue failed, code:inputQ is full, ver:%" PRId64, id,
pTask->chkInfo.nextProcessVer);
code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
if (code) {
tqError("s-task:%s failed to seek ver to:%"PRId64 " in wal", id, pTask->chkInfo.nextProcessVer);
tqError("s-task:%s failed to seek ver to:%" PRId64 " in wal", id, pTask->chkInfo.nextProcessVer);
}
code = 0; // reset the error code
}
break;
@ -339,7 +348,8 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems
}
*numOfItems += numOfNewItems;
return numOfNewItems > 0;
*pSucc = (numOfNewItems > 0);
return code;
}
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
@ -358,6 +368,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
streamMetaWUnLock(pStreamMeta);
if (pTaskList == NULL) {
tqError("vgId:%d failed to create task list dup, code:%s", vgId, tstrerror(terrno));
return terrno;
}
@ -405,7 +416,8 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}
bool hasNewData = doPutDataIntoInputQ(pTask, maxVer, &numOfItems);
bool hasNewData = false;
code = doPutDataIntoInputQ(pTask, maxVer, &numOfItems, &hasNewData);
streamMutexUnlock(&pTask->lock);
if ((numOfItems > 0) || hasNewData) {

View File

@ -78,7 +78,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
if (pTask->exec.pExecutor == NULL) {
tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr);
return -1;
return terrno;
}
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
}
@ -840,7 +840,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t idle = taosGetTimestampMs() - execTs;
tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
streamResumeTask(pTask);
code = streamResumeTask(pTask);
} else {
int8_t status = streamTaskSetSchedStatusInactive(pTask);
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
@ -849,7 +849,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
streamMetaReleaseTask(pMeta, pTask);
}
return 0;
return code;
}
SStreamTask* pTask = NULL;

View File

@ -287,7 +287,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
streamDataSubmitDestroy(px);
return TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_STREAM_INPUTQ_FULL;
}
int32_t msgLen = px->submit.msgLen;
@ -312,7 +312,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
streamFreeQitem(pItem);
return TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_STREAM_INPUTQ_FULL;
}
int32_t code = taosWriteQitem(pQueue, pItem);

View File

@ -816,6 +816,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_CONFLICT_EVENT, "Stream conflict event")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full")
// TDLite
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")