From 4d6f143146576954ddc54f4aceac1ed7a6214b86 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 7 Oct 2023 19:18:26 +0800 Subject: [PATCH] fix(stream): add missing refactor. --- source/libs/stream/src/streamTask.c | 121 +++++++++++++++------------- 1 file changed, 66 insertions(+), 55 deletions(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 8dffbec09f..b6a60e28d7 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -20,6 +20,8 @@ #include "ttimer.h" #include "wal.h" +static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo); + static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { int32_t childId = taosArrayGetSize(pArray); pTask->info.selfChildId = childId; @@ -48,7 +50,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.taskStatus = (fillHistory || hasFillhistory)? TASK_STATUS__SCAN_HISTORY:TASK_STATUS__NORMAL; pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; - pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; + pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; if (fillHistory) { ASSERT(hasFillhistory); @@ -113,10 +115,10 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1; if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1; - int32_t epSz = taosArrayGetSize(pTask->pUpstreamInfoList); + int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList); if (tEncodeI32(pEncoder, epSz) < 0) return -1; for (int32_t i = 0; i < epSz; i++) { - SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1; } @@ -125,20 +127,20 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { } if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1; - if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1; - if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid) < 0) return -1; + if (tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1; + if (tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1; } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId) < 0) return -1; } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { - if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved) < 0) return -1; } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - if (tEncodeI32(pEncoder, pTask->fixedDispatcher.taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->fixedDispatcher.nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pTask->fixedDispatcher.epSet) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1; } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; - if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; + if (tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1; + if (tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; } if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1; if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1; @@ -189,7 +191,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { int32_t epSz = -1; if (tDecodeI32(pDecoder, &epSz) < 0) return -1; - pTask->pUpstreamInfoList = taosArrayInit(epSz, POINTER_BYTES); + pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES); for (int32_t i = 0; i < epSz; i++) { SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo)); if (pInfo == NULL) return -1; @@ -197,7 +199,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { taosMemoryFreeClear(pInfo); return -1; } - taosArrayPush(pTask->pUpstreamInfoList, &pInfo); + taosArrayPush(pTask->upstreamInfo.pList, &pInfo); } if (pTask->info.taskLevel != TASK_LEVEL__SINK) { @@ -205,22 +207,22 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1; - pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); - if (pTask->tbSink.pSchemaWrapper == NULL) return -1; - if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1; + pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) return -1; + if (tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1; } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId) < 0) return -1; } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { - if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved) < 0) return -1; } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - if (tDecodeI32(pDecoder, &pTask->fixedDispatcher.taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->fixedDispatcher.nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pTask->fixedDispatcher.epSet) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1; } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; + if (tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; } if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1; @@ -331,8 +333,8 @@ void tFreeStreamTask(SStreamTask* pTask) { streamQueueClose(pTask->inputInfo.queue, pTask->id.taskId); } - if (pTask->outputInfo.queue) { - streamQueueClose(pTask->outputInfo.queue, pTask->id.taskId); + if (pTask->outputq.queue) { + streamQueueClose(pTask->outputq.queue, pTask->id.taskId); } if (pTask->exec.qmsg) { @@ -356,11 +358,11 @@ void tFreeStreamTask(SStreamTask* pTask) { } if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper); - taosMemoryFree(pTask->tbSink.pTSchema); - tSimpleHashCleanup(pTask->tbSink.pTblInfo); + tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper); + taosMemoryFree(pTask->outputInfo.tbSink.pTSchema); + tSimpleHashCleanup(pTask->outputInfo.tbSink.pTblInfo); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); + taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos); pTask->checkReqIds = taosArrayDestroy(pTask->checkReqIds); } @@ -382,13 +384,10 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->pRspMsgList = NULL; } - if (pTask->pUpstreamInfoList != NULL) { - taosArrayDestroyEx(pTask->pUpstreamInfoList, freeUpstreamItem); - pTask->pUpstreamInfoList = NULL; - } + streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo); pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList); - taosMemoryFree(pTask->pTokenBucket); + taosMemoryFree(pTask->outputInfo.pTokenBucket); taosThreadMutexDestroy(&pTask->lock); taosMemoryFree(pTask); @@ -401,16 +400,16 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.timerActive = 0; pTask->inputInfo.queue = streamQueueOpen(512 << 10); - pTask->outputInfo.queue = streamQueueOpen(512 << 10); + pTask->outputq.queue = streamQueueOpen(512 << 10); - if (pTask->inputInfo.queue == NULL || pTask->outputInfo.queue == NULL) { + if (pTask->inputInfo.queue == NULL || pTask->outputq.queue == NULL) { stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr); return TSDB_CODE_OUT_OF_MEMORY; } pTask->execInfo.created = taosGetTimestampMs(); pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; - pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; + pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMeta = pMeta; pTask->chkInfo.checkpointVer = ver - 1; @@ -420,15 +419,15 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->pMsgCb = pMsgCb; pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t)); - pTask->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket)); - if (pTask->pTokenBucket == NULL) { + pTask->outputInfo.pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket)); + if (pTask->outputInfo.pTokenBucket == NULL) { stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); return TSDB_CODE_OUT_OF_MEMORY; } // 2MiB per second for sink task // 50 times sink operator per second - streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50, 2); + streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, 2); TdThreadMutexAttr attr = {0}; int code = taosThreadMutexAttrInit(&attr); @@ -457,7 +456,7 @@ int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) { if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__TABLE) { return 1; } else { - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; return taosArrayGetSize(vgInfo); } } @@ -485,11 +484,11 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre return TSDB_CODE_OUT_OF_MEMORY; } - if (pTask->pUpstreamInfoList == NULL) { - pTask->pUpstreamInfoList = taosArrayInit(4, POINTER_BYTES); + if (pTask->upstreamInfo.pList == NULL) { + pTask->upstreamInfo.pList = taosArrayInit(4, POINTER_BYTES); } - taosArrayPush(pTask->pUpstreamInfoList, &pEpInfo); + taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo); return TSDB_CODE_SUCCESS; } @@ -497,9 +496,9 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS char buf[512] = {0}; EPSET_TO_STR(pEpSet, buf); - int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamInfoList); + int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); for (int32_t i = 0; i < numOfUpstream; ++i) { - SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); if (pInfo->nodeId == nodeId) { epsetAssign(&pInfo->epSet, pEpSet); stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId, @@ -509,8 +508,16 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS } } +void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo) { + if (pUpstreamInfo->pList != NULL) { + taosArrayDestroyEx(pUpstreamInfo->pList, freeUpstreamItem); + pUpstreamInfo->numOfClosed = 0; + pUpstreamInfo->pList = NULL; + } +} + void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) { - STaskDispatcherFixed* pDispatcher = &pTask->fixedDispatcher; + STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher; pDispatcher->taskId = pDownstreamTask->id.taskId; pDispatcher->nodeId = pDownstreamTask->info.nodeId; pDispatcher->epSet = pDownstreamTask->info.epSet; @@ -525,7 +532,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE int8_t type = pTask->outputInfo.type; if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgroups = taosArrayGetSize(pVgs); for (int32_t i = 0; i < numOfVgroups; i++) { @@ -539,7 +546,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE } } } else if (type == TASK_OUTPUT__FIXED_DISPATCH) { - STaskDispatcherFixed* pDispatcher = &pTask->fixedDispatcher; + STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher; if (pDispatcher->nodeId == nodeId) { epsetAssign(&pDispatcher->epSet, pEpSet); stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId, @@ -620,15 +627,19 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) { return; } - int32_t size = taosArrayGetSize(pTask->pUpstreamInfoList); + int32_t size = taosArrayGetSize(pTask->upstreamInfo.pList); for (int32_t i = 0; i < size; ++i) { - SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); pInfo->stage = -1; } stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr); } +bool streamTaskAllUpstreamClosed(SStreamTask* pTask) { + return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList); +} + bool streamTaskSetSchedStatusWait(SStreamTask* pTask) { bool ret = false;