From 99dbb78992f3be1477f0e0b91aade9d6977f45a8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 13 Sep 2024 22:43:55 +0800 Subject: [PATCH] refactor: check return value for stream. --- source/dnode/vnode/src/tq/tq.c | 16 +++-- source/libs/stream/src/streamCheckStatus.c | 34 ++++++----- source/libs/stream/src/streamMeta.c | 70 +++++++--------------- source/libs/stream/src/streamTask.c | 3 +- 4 files changed, 54 insertions(+), 69 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2d1d3ed357..a2c088de68 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -102,7 +102,6 @@ int32_t tqOpen(const char* path, SVnode* pVnode) { int32_t tqInitialize(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); - int32_t code = streamMetaOpen(pTq->path, pTq, tqBuildStreamTask, tqExpandStreamTask, vgId, -1, tqStartTaskCompleteCallback, &pTq->pStreamMeta); if (code != TSDB_CODE_SUCCESS) { @@ -110,7 +109,6 @@ int32_t tqInitialize(STQ* pTq) { } streamMetaLoadAllTasks(pTq->pStreamMeta); - return tqMetaOpen(pTq); } @@ -713,8 +711,7 @@ end: static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) { - STQ* pTq = (STQ*)pTqObj; - + STQ* pTq = (STQ*)pTqObj; int32_t vgId = TD_VID(pTq->pVnode); tqDebug("s-task:0x%x start to build task", pTask->id.taskId); @@ -744,16 +741,25 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV SSchemaWrapper* pschemaWrapper = pOutputInfo->tbSink.pSchemaWrapper; pOutputInfo->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1); if (pOutputInfo->tbSink.pTSchema == NULL) { - return -1; + return terrno; } pOutputInfo->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + if (pOutputInfo->tbSink.pTblInfo == NULL) { + tqError("vgId:%d failed init sink tableInfo, code:%s", vgId, tstrerror(terrno)); + return terrno; + } + tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTblInfo, freePtr); } if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId); + if (pTask->exec.pWalReader == NULL) { + tqError("vgId:%d failed init wal reader, code:%s", vgId, tstrerror(terrno)); + return terrno; + } } streamTaskResetUpstreamStageInfo(pTask); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 540199bb90..2bfab82805 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -21,7 +21,7 @@ #define CHECK_NOT_RSP_DURATION 10 * 1000 // 10 sec static void processDownstreamReadyRsp(SStreamTask* pTask); -static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId); +static int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId); static void rspMonitorFn(void* param, void* tmrId); static void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs); static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); @@ -226,13 +226,13 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64 ", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart", id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); - addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); + code = addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); } else { stError( "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " "downstream again, nodeUpdate needed", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); - addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); + code = addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); } streamMetaAddFailedTaskSelf(pTask, now); @@ -371,12 +371,14 @@ void processDownstreamReadyRsp(SStreamTask* pTask) { } } -void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { +int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { int32_t vgId = pTask->pMeta->vgId; + int32_t code = 0;; + bool existed = false; streamMutexLock(&pTask->lock); + int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList); - bool existed = false; for (int i = 0; i < num; ++i) { SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i); if (p == NULL) { @@ -391,15 +393,19 @@ void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { if (!existed) { SDownstreamTaskEpset t = {.nodeId = nodeId}; - void* p = taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t); + + void* p = taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t); if (p == NULL) { - // todo let's retry + code = terrno; + stError("s-task:%s vgId:%d failed to update epset, code:%s", pTask->id.idStr, tstrerror(code)); + } else { + stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, + vgId, t.nodeId, (num + 1)); } - stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId, - t.nodeId, (num + 1)); } streamMutexUnlock(&pTask->lock); + return code; } void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) { @@ -629,6 +635,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { const char* id = pTask->id.idStr; int32_t vgId = pTask->pMeta->vgId; int32_t numOfTimeout = taosArrayGetSize(pTimeoutList); + int32_t code = 0; pInfo->timeoutStartTs = taosGetTimestampMs(); for (int32_t i = 0; i < numOfTimeout; ++i) { @@ -640,14 +647,13 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { int32_t taskId = *px; SDownstreamStatusInfo* p = NULL; findCheckRspStatus(pInfo, taskId, &p); + if (p != NULL) { if (p->status != -1 || p->rspTs != 0) { - stError("s-task:%s invalid rsp record entry, index:%d, status:%d, rspTs:%" PRId64, pTask->id.idStr, i, - p->status, p->rspTs); + stError("s-task:%s invalid rsp record entry, index:%d, status:%d, rspTs:%" PRId64, id, i, p->status, p->rspTs); continue; } - - int32_t code = doSendCheckMsg(pTask, p); + code = doSendCheckMsg(pTask, p); } } @@ -666,7 +672,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { SDownstreamStatusInfo* p = NULL; findCheckRspStatus(pInfo, *pTaskId, &p); if (p != NULL) { - addIntoNodeUpdateList(pTask, p->vgId); + code = addIntoNodeUpdateList(pTask, p->vgId); stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list", id, vgId, p->taskId, p->vgId); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 0417fb2182..514e25c689 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -368,8 +368,9 @@ void streamMetaRemoveDB(void* arg, char* key) { int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId, int64_t stage, startComplete_fn_t fn, SStreamMeta** p) { - int32_t code = 0; QRY_PARAM_CHECK(p); + int32_t code = 0; + int32_t lino = 0; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); if (pMeta == NULL) { @@ -379,23 +380,18 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, int32_t len = strlen(path) + 64; char* tpath = taosMemoryCalloc(1, len); - if (tpath == NULL) { - code = terrno; - goto _err; - } + TSDB_CHECK_NULL(tpath, code, lino, _err, terrno); sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream"); pMeta->path = tpath; code = streamMetaOpenTdb(pMeta); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); if ((code = streamMetaMayCvtDbFormat(pMeta)) < 0) { stError("vgId:%d convert sub info format failed, open stream meta failed, reason: %s", pMeta->vgId, tstrerror(terrno)); - goto _err; + TSDB_CHECK_CODE(code, lino, _err); } if ((code = streamMetaBegin(pMeta) < 0)) { @@ -405,28 +401,17 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK); - if (pMeta->pTasksMap == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + TSDB_CHECK_NULL(pMeta->pTasksMap, code, lino, _err, terrno); pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK); - if (pMeta->updateInfo.pTasks == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + TSDB_CHECK_NULL(pMeta->updateInfo.pTasks, code, lino, _err, terrno); code = streamMetaInitStartInfo(&pMeta->startInfo); - if (code) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); // task list pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId)); - if (pMeta->pTaskList == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno); pMeta->scanInfo.scanCounter = 0; pMeta->vgId = vgId; @@ -440,59 +425,47 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, pMeta->startInfo.completeFn = fn; pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + TSDB_CHECK_NULL(pMeta->pTaskDbUnique, code, lino, _err, terrno); pMeta->numOfPausedTasks = 0; pMeta->numOfStreamTasks = 0; pMeta->closeFlag = false; stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage); - pMeta->rid = taosAddRef(streamMetaId, pMeta); // set the attribute when running on Linux OS TdThreadRwlockAttr attr; code = taosThreadRwlockAttrInit(&attr); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); #ifdef LINUX code = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); #endif code = taosThreadRwlockInit(&pMeta->lock, &attr); - if (code) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); code = taosThreadRwlockAttrDestroy(&attr); - if (code) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); code = metaRefMgtAdd(pMeta->vgId, pRid); - if (code) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); code = createMetaHbInfo(pRid, &pMeta->pHbInfo); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); + TSDB_CHECK_NULL(pMeta->qHandle, code, lino, _err, terrno); code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt); - if (code != 0) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); code = taosThreadMutexInit(&pMeta->backendMutex, NULL); + TSDB_CHECK_CODE(code, lino, _err); *p = pMeta; return code; @@ -526,9 +499,10 @@ _err: if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet); if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet); if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt); + taosMemoryFree(pMeta); - stError("failed to open stream meta, reason:%s", tstrerror(terrno)); + stError("vgId:%d failed to open stream meta, at line:%d reason:%s", vgId, lino, tstrerror(code)); return code; } @@ -1274,7 +1248,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { void streamMetaStartHb(SStreamMeta* pMeta) { int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); if (pRid == NULL) { - stError("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId); + stFatal("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId); return; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index fb2456e1cd..9a324084ff 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -487,8 +487,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i STaskOutputInfo* pOutputInfo = &pTask->outputInfo; pOutputInfo->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket)); if (pOutputInfo->pTokenBucket == NULL) { - stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(terrno)); return terrno; }