refactor: check return value for stream.

This commit is contained in:
Haojun Liao 2024-09-13 22:43:55 +08:00
parent a33015e712
commit 99dbb78992
4 changed files with 54 additions and 69 deletions

View File

@ -102,7 +102,6 @@ int32_t tqOpen(const char* path, SVnode* pVnode) {
int32_t tqInitialize(STQ* pTq) { int32_t tqInitialize(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
int32_t code = streamMetaOpen(pTq->path, pTq, tqBuildStreamTask, tqExpandStreamTask, vgId, -1, int32_t code = streamMetaOpen(pTq->path, pTq, tqBuildStreamTask, tqExpandStreamTask, vgId, -1,
tqStartTaskCompleteCallback, &pTq->pStreamMeta); tqStartTaskCompleteCallback, &pTq->pStreamMeta);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -110,7 +109,6 @@ int32_t tqInitialize(STQ* pTq) {
} }
streamMetaLoadAllTasks(pTq->pStreamMeta); streamMetaLoadAllTasks(pTq->pStreamMeta);
return tqMetaOpen(pTq); return tqMetaOpen(pTq);
} }
@ -714,7 +712,6 @@ static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) { int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) {
STQ* pTq = (STQ*)pTqObj; STQ* pTq = (STQ*)pTqObj;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("s-task:0x%x start to build task", pTask->id.taskId); tqDebug("s-task:0x%x start to build task", pTask->id.taskId);
@ -744,16 +741,25 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
SSchemaWrapper* pschemaWrapper = pOutputInfo->tbSink.pSchemaWrapper; SSchemaWrapper* pschemaWrapper = pOutputInfo->tbSink.pSchemaWrapper;
pOutputInfo->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1); pOutputInfo->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
if (pOutputInfo->tbSink.pTSchema == NULL) { if (pOutputInfo->tbSink.pTSchema == NULL) {
return -1; return terrno;
} }
pOutputInfo->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); pOutputInfo->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (pOutputInfo->tbSink.pTblInfo == NULL) {
tqError("vgId:%d failed init sink tableInfo, code:%s", vgId, tstrerror(terrno));
return terrno;
}
tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTblInfo, freePtr); tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTblInfo, freePtr);
} }
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId); pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
if (pTask->exec.pWalReader == NULL) {
tqError("vgId:%d failed init wal reader, code:%s", vgId, tstrerror(terrno));
return terrno;
}
} }
streamTaskResetUpstreamStageInfo(pTask); streamTaskResetUpstreamStageInfo(pTask);

View File

@ -21,7 +21,7 @@
#define CHECK_NOT_RSP_DURATION 10 * 1000 // 10 sec #define CHECK_NOT_RSP_DURATION 10 * 1000 // 10 sec
static void processDownstreamReadyRsp(SStreamTask* pTask); static void processDownstreamReadyRsp(SStreamTask* pTask);
static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId); static int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId);
static void rspMonitorFn(void* param, void* tmrId); static void rspMonitorFn(void* param, void* tmrId);
static void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs); static void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
@ -226,13 +226,13 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64 stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart", ", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart",
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); code = addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
} else { } else {
stError( stError(
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
"downstream again, nodeUpdate needed", "downstream again, nodeUpdate needed",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); code = addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
} }
streamMetaAddFailedTaskSelf(pTask, now); streamMetaAddFailedTaskSelf(pTask, now);
@ -371,12 +371,14 @@ void processDownstreamReadyRsp(SStreamTask* pTask) {
} }
} }
void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
int32_t code = 0;;
bool existed = false;
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList); int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
bool existed = false;
for (int i = 0; i < num; ++i) { for (int i = 0; i < num; ++i) {
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i); SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i);
if (p == NULL) { if (p == NULL) {
@ -391,15 +393,19 @@ void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
if (!existed) { if (!existed) {
SDownstreamTaskEpset t = {.nodeId = nodeId}; SDownstreamTaskEpset t = {.nodeId = nodeId};
void* p = taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t); void* p = taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t);
if (p == NULL) { if (p == NULL) {
// todo let's retry code = terrno;
stError("s-task:%s vgId:%d failed to update epset, code:%s", pTask->id.idStr, tstrerror(code));
} else {
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr,
vgId, t.nodeId, (num + 1));
} }
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId,
t.nodeId, (num + 1));
} }
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
return code;
} }
void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) { void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
@ -629,6 +635,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
int32_t numOfTimeout = taosArrayGetSize(pTimeoutList); int32_t numOfTimeout = taosArrayGetSize(pTimeoutList);
int32_t code = 0;
pInfo->timeoutStartTs = taosGetTimestampMs(); pInfo->timeoutStartTs = taosGetTimestampMs();
for (int32_t i = 0; i < numOfTimeout; ++i) { for (int32_t i = 0; i < numOfTimeout; ++i) {
@ -640,14 +647,13 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
int32_t taskId = *px; int32_t taskId = *px;
SDownstreamStatusInfo* p = NULL; SDownstreamStatusInfo* p = NULL;
findCheckRspStatus(pInfo, taskId, &p); findCheckRspStatus(pInfo, taskId, &p);
if (p != NULL) { if (p != NULL) {
if (p->status != -1 || p->rspTs != 0) { if (p->status != -1 || p->rspTs != 0) {
stError("s-task:%s invalid rsp record entry, index:%d, status:%d, rspTs:%" PRId64, pTask->id.idStr, i, stError("s-task:%s invalid rsp record entry, index:%d, status:%d, rspTs:%" PRId64, id, i, p->status, p->rspTs);
p->status, p->rspTs);
continue; continue;
} }
code = doSendCheckMsg(pTask, p);
int32_t code = doSendCheckMsg(pTask, p);
} }
} }
@ -666,7 +672,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
SDownstreamStatusInfo* p = NULL; SDownstreamStatusInfo* p = NULL;
findCheckRspStatus(pInfo, *pTaskId, &p); findCheckRspStatus(pInfo, *pTaskId, &p);
if (p != NULL) { if (p != NULL) {
addIntoNodeUpdateList(pTask, p->vgId); code = addIntoNodeUpdateList(pTask, p->vgId);
stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list", stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list",
id, vgId, p->taskId, p->vgId); id, vgId, p->taskId, p->vgId);
} }

View File

@ -368,8 +368,9 @@ void streamMetaRemoveDB(void* arg, char* key) {
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId, int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId,
int64_t stage, startComplete_fn_t fn, SStreamMeta** p) { int64_t stage, startComplete_fn_t fn, SStreamMeta** p) {
int32_t code = 0;
QRY_PARAM_CHECK(p); QRY_PARAM_CHECK(p);
int32_t code = 0;
int32_t lino = 0;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) { if (pMeta == NULL) {
@ -379,23 +380,18 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
int32_t len = strlen(path) + 64; int32_t len = strlen(path) + 64;
char* tpath = taosMemoryCalloc(1, len); char* tpath = taosMemoryCalloc(1, len);
if (tpath == NULL) { TSDB_CHECK_NULL(tpath, code, lino, _err, terrno);
code = terrno;
goto _err;
}
sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream"); sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream");
pMeta->path = tpath; pMeta->path = tpath;
code = streamMetaOpenTdb(pMeta); code = streamMetaOpenTdb(pMeta);
if (code != TSDB_CODE_SUCCESS) { TSDB_CHECK_CODE(code, lino, _err);
goto _err;
}
if ((code = streamMetaMayCvtDbFormat(pMeta)) < 0) { if ((code = streamMetaMayCvtDbFormat(pMeta)) < 0) {
stError("vgId:%d convert sub info format failed, open stream meta failed, reason: %s", pMeta->vgId, stError("vgId:%d convert sub info format failed, open stream meta failed, reason: %s", pMeta->vgId,
tstrerror(terrno)); tstrerror(terrno));
goto _err; TSDB_CHECK_CODE(code, lino, _err);
} }
if ((code = streamMetaBegin(pMeta) < 0)) { if ((code = streamMetaBegin(pMeta) < 0)) {
@ -405,28 +401,17 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK); pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
if (pMeta->pTasksMap == NULL) { TSDB_CHECK_NULL(pMeta->pTasksMap, code, lino, _err, terrno);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK); pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
if (pMeta->updateInfo.pTasks == NULL) { TSDB_CHECK_NULL(pMeta->updateInfo.pTasks, code, lino, _err, terrno);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = streamMetaInitStartInfo(&pMeta->startInfo); code = streamMetaInitStartInfo(&pMeta->startInfo);
if (code) { TSDB_CHECK_CODE(code, lino, _err);
goto _err;
}
// task list // task list
pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId)); pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
if (pMeta->pTaskList == NULL) { TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pMeta->scanInfo.scanCounter = 0; pMeta->scanInfo.scanCounter = 0;
pMeta->vgId = vgId; pMeta->vgId = vgId;
@ -440,59 +425,47 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
pMeta->startInfo.completeFn = fn; pMeta->startInfo.completeFn = fn;
pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
TSDB_CHECK_NULL(pMeta->pTaskDbUnique, code, lino, _err, terrno);
pMeta->numOfPausedTasks = 0; pMeta->numOfPausedTasks = 0;
pMeta->numOfStreamTasks = 0; pMeta->numOfStreamTasks = 0;
pMeta->closeFlag = false; pMeta->closeFlag = false;
stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage); stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage);
pMeta->rid = taosAddRef(streamMetaId, pMeta); pMeta->rid = taosAddRef(streamMetaId, pMeta);
// set the attribute when running on Linux OS // set the attribute when running on Linux OS
TdThreadRwlockAttr attr; TdThreadRwlockAttr attr;
code = taosThreadRwlockAttrInit(&attr); code = taosThreadRwlockAttrInit(&attr);
if (code != TSDB_CODE_SUCCESS) { TSDB_CHECK_CODE(code, lino, _err);
goto _err;
}
#ifdef LINUX #ifdef LINUX
code = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); code = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
if (code != TSDB_CODE_SUCCESS) { TSDB_CHECK_CODE(code, lino, _err);
goto _err;
}
#endif #endif
code = taosThreadRwlockInit(&pMeta->lock, &attr); code = taosThreadRwlockInit(&pMeta->lock, &attr);
if (code) { TSDB_CHECK_CODE(code, lino, _err);
goto _err;
}
code = taosThreadRwlockAttrDestroy(&attr); code = taosThreadRwlockAttrDestroy(&attr);
if (code) { TSDB_CHECK_CODE(code, lino, _err);
goto _err;
}
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
code = metaRefMgtAdd(pMeta->vgId, pRid); code = metaRefMgtAdd(pMeta->vgId, pRid);
if (code) { TSDB_CHECK_CODE(code, lino, _err);
goto _err;
}
code = createMetaHbInfo(pRid, &pMeta->pHbInfo); code = createMetaHbInfo(pRid, &pMeta->pHbInfo);
if (code != TSDB_CODE_SUCCESS) { TSDB_CHECK_CODE(code, lino, _err);
goto _err;
}
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
TSDB_CHECK_NULL(pMeta->qHandle, code, lino, _err, terrno);
code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt); code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt);
if (code != 0) { TSDB_CHECK_CODE(code, lino, _err);
goto _err;
}
code = taosThreadMutexInit(&pMeta->backendMutex, NULL); code = taosThreadMutexInit(&pMeta->backendMutex, NULL);
TSDB_CHECK_CODE(code, lino, _err);
*p = pMeta; *p = pMeta;
return code; return code;
@ -526,9 +499,10 @@ _err:
if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet); if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet); if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt); if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
stError("failed to open stream meta, reason:%s", tstrerror(terrno)); stError("vgId:%d failed to open stream meta, at line:%d reason:%s", vgId, lino, tstrerror(code));
return code; return code;
} }
@ -1274,7 +1248,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
void streamMetaStartHb(SStreamMeta* pMeta) { void streamMetaStartHb(SStreamMeta* pMeta) {
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
if (pRid == NULL) { if (pRid == NULL) {
stError("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId); stFatal("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId);
return; return;
} }

View File

@ -487,8 +487,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
STaskOutputInfo* pOutputInfo = &pTask->outputInfo; STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
pOutputInfo->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket)); pOutputInfo->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
if (pOutputInfo->pTokenBucket == NULL) { if (pOutputInfo->pTokenBucket == NULL) {
stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(terrno));
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return terrno; return terrno;
} }