fix(stream): add missing refactor.

This commit is contained in:
Haojun Liao 2023-10-07 19:18:26 +08:00
parent ab17bfdf09
commit 4d6f143146
1 changed files with 66 additions and 55 deletions

View File

@ -20,6 +20,8 @@
#include "ttimer.h"
#include "wal.h"
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);
pTask->info.selfChildId = childId;
@ -48,7 +50,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.taskStatus = (fillHistory || hasFillhistory)? TASK_STATUS__SCAN_HISTORY:TASK_STATUS__NORMAL;
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
if (fillHistory) {
ASSERT(hasFillhistory);
@ -113,10 +115,10 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1;
if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1;
int32_t epSz = taosArrayGetSize(pTask->pUpstreamInfoList);
int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
}
@ -125,20 +127,20 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
}
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1;
if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1;
if (tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
if (tEncodeI32(pEncoder, pTask->fixedDispatcher.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->fixedDispatcher.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->fixedDispatcher.epSet) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
if (tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
}
if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1;
if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1;
@ -189,7 +191,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
int32_t epSz = -1;
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
pTask->pUpstreamInfoList = taosArrayInit(epSz, POINTER_BYTES);
pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES);
for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
if (pInfo == NULL) return -1;
@ -197,7 +199,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
taosMemoryFreeClear(pInfo);
return -1;
}
taosArrayPush(pTask->pUpstreamInfoList, &pInfo);
taosArrayPush(pTask->upstreamInfo.pList, &pInfo);
}
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
@ -205,22 +207,22 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
}
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1;
pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1;
pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
if (tDecodeI32(pDecoder, &pTask->fixedDispatcher.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->fixedDispatcher.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->fixedDispatcher.epSet) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
if (tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
}
if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1;
@ -331,8 +333,8 @@ void tFreeStreamTask(SStreamTask* pTask) {
streamQueueClose(pTask->inputInfo.queue, pTask->id.taskId);
}
if (pTask->outputInfo.queue) {
streamQueueClose(pTask->outputInfo.queue, pTask->id.taskId);
if (pTask->outputq.queue) {
streamQueueClose(pTask->outputq.queue, pTask->id.taskId);
}
if (pTask->exec.qmsg) {
@ -356,11 +358,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
}
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
taosMemoryFree(pTask->tbSink.pTSchema);
tSimpleHashCleanup(pTask->tbSink.pTblInfo);
tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper);
taosMemoryFree(pTask->outputInfo.tbSink.pTSchema);
tSimpleHashCleanup(pTask->outputInfo.tbSink.pTblInfo);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
pTask->checkReqIds = taosArrayDestroy(pTask->checkReqIds);
}
@ -382,13 +384,10 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->pRspMsgList = NULL;
}
if (pTask->pUpstreamInfoList != NULL) {
taosArrayDestroyEx(pTask->pUpstreamInfoList, freeUpstreamItem);
pTask->pUpstreamInfoList = NULL;
}
streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList);
taosMemoryFree(pTask->pTokenBucket);
taosMemoryFree(pTask->outputInfo.pTokenBucket);
taosThreadMutexDestroy(&pTask->lock);
taosMemoryFree(pTask);
@ -401,16 +400,16 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.timerActive = 0;
pTask->inputInfo.queue = streamQueueOpen(512 << 10);
pTask->outputInfo.queue = streamQueueOpen(512 << 10);
pTask->outputq.queue = streamQueueOpen(512 << 10);
if (pTask->inputInfo.queue == NULL || pTask->outputInfo.queue == NULL) {
if (pTask->inputInfo.queue == NULL || pTask->outputq.queue == NULL) {
stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
return TSDB_CODE_OUT_OF_MEMORY;
}
pTask->execInfo.created = taosGetTimestampMs();
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMeta = pMeta;
pTask->chkInfo.checkpointVer = ver - 1;
@ -420,15 +419,15 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->pMsgCb = pMsgCb;
pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t));
pTask->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
if (pTask->pTokenBucket == NULL) {
pTask->outputInfo.pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
if (pTask->outputInfo.pTokenBucket == NULL) {
stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
}
// 2MiB per second for sink task
// 50 times sink operator per second
streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50, 2);
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, 2);
TdThreadMutexAttr attr = {0};
int code = taosThreadMutexAttrInit(&attr);
@ -457,7 +456,7 @@ int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__TABLE) {
return 1;
} else {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
return taosArrayGetSize(vgInfo);
}
}
@ -485,11 +484,11 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pTask->pUpstreamInfoList == NULL) {
pTask->pUpstreamInfoList = taosArrayInit(4, POINTER_BYTES);
if (pTask->upstreamInfo.pList == NULL) {
pTask->upstreamInfo.pList = taosArrayInit(4, POINTER_BYTES);
}
taosArrayPush(pTask->pUpstreamInfoList, &pEpInfo);
taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo);
return TSDB_CODE_SUCCESS;
}
@ -497,9 +496,9 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
char buf[512] = {0};
EPSET_TO_STR(pEpSet, buf);
int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
for (int32_t i = 0; i < numOfUpstream; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
if (pInfo->nodeId == nodeId) {
epsetAssign(&pInfo->epSet, pEpSet);
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId,
@ -509,8 +508,16 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
}
}
void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo) {
if (pUpstreamInfo->pList != NULL) {
taosArrayDestroyEx(pUpstreamInfo->pList, freeUpstreamItem);
pUpstreamInfo->numOfClosed = 0;
pUpstreamInfo->pList = NULL;
}
}
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) {
STaskDispatcherFixed* pDispatcher = &pTask->fixedDispatcher;
STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
pDispatcher->taskId = pDownstreamTask->id.taskId;
pDispatcher->nodeId = pDownstreamTask->info.nodeId;
pDispatcher->epSet = pDownstreamTask->info.epSet;
@ -525,7 +532,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
int8_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(pVgs);
for (int32_t i = 0; i < numOfVgroups; i++) {
@ -539,7 +546,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
}
}
} else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
STaskDispatcherFixed* pDispatcher = &pTask->fixedDispatcher;
STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
if (pDispatcher->nodeId == nodeId) {
epsetAssign(&pDispatcher->epSet, pEpSet);
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId,
@ -620,15 +627,19 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
return;
}
int32_t size = taosArrayGetSize(pTask->pUpstreamInfoList);
int32_t size = taosArrayGetSize(pTask->upstreamInfo.pList);
for (int32_t i = 0; i < size; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
pInfo->stage = -1;
}
stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
}
bool streamTaskAllUpstreamClosed(SStreamTask* pTask) {
return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList);
}
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
bool ret = false;