fix mem leak
This commit is contained in:
parent
4f5359f684
commit
a92602b667
|
@ -34,7 +34,6 @@ extern "C" {
|
|||
#define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F)
|
||||
#define SIZE_IN_KiB(_v) ((_v) / ONE_KiB_F)
|
||||
|
||||
|
||||
#define TASK_DOWNSTREAM_READY 0x0
|
||||
#define TASK_DOWNSTREAM_NOT_READY 0x1
|
||||
#define TASK_DOWNSTREAM_NOT_LEADER 0x2
|
||||
|
@ -842,7 +841,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pMeta);
|
|||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
||||
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
|
||||
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
|
||||
void streamTaskClearCheckInfo(SStreamTask* pTask);
|
||||
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg);
|
||||
int32_t streamAlignTransferState(SStreamTask* pTask);
|
||||
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId);
|
||||
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
|
||||
|
|
|
@ -714,8 +714,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
taosWLockLatch(&pTq->lock);
|
||||
bool exec = tqIsHandleExec(pHandle);
|
||||
if (exec) {
|
||||
tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p", pTq->pVnode->config.vgId,
|
||||
pHandle->subKey, pHandle);
|
||||
tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p",
|
||||
pTq->pVnode->config.vgId, pHandle->subKey, pHandle);
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
taosMsleep(10);
|
||||
continue;
|
||||
|
@ -922,7 +922,8 @@ int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
// only the leader node handle the check request
|
||||
if (pMeta->role == NODE_ROLE_FOLLOWER) {
|
||||
tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
|
||||
tqError(
|
||||
"s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
|
||||
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
|
||||
rsp.status = TASK_DOWNSTREAM_NOT_LEADER;
|
||||
} else {
|
||||
|
@ -933,7 +934,8 @@ int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
char* p = NULL;
|
||||
streamTaskGetStatus(pTask, &p);
|
||||
tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d",
|
||||
tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64
|
||||
") task:0x%x (vgId:%d), check_status:%d",
|
||||
pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||
} else {
|
||||
rsp.status = TASK_DOWNSTREAM_NOT_READY;
|
||||
|
@ -1029,7 +1031,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
|||
streamMetaWUnLock(pStreamMeta);
|
||||
|
||||
if (code < 0) {
|
||||
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code));
|
||||
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks,
|
||||
tstrerror(code));
|
||||
tFreeStreamTask(pTask);
|
||||
return code;
|
||||
}
|
||||
|
@ -1484,7 +1487,6 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
|
|||
int32_t level = pTask->info.taskLevel;
|
||||
if (level == TASK_LEVEL__SINK) {
|
||||
if (status == TASK_STATUS__UNINIT) {
|
||||
|
||||
}
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return 0;
|
||||
|
@ -1710,7 +1712,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
pTask->checkpointingId = req.checkpointId;
|
||||
|
||||
qError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
|
||||
", set it failure", pTask->id.idStr, req.checkpointId);
|
||||
", set it failure",
|
||||
pTask->id.idStr, req.checkpointId);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
SRpcMsg rsp = {0};
|
||||
|
@ -1931,7 +1934,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
streamMetaWUnLock(pMeta);
|
||||
} else {
|
||||
if (!pTq->pVnode->restored) {
|
||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
|
||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag",
|
||||
vgId);
|
||||
pMeta->startInfo.tasksWillRestart = 0;
|
||||
streamMetaWUnLock(pMeta);
|
||||
} else {
|
||||
|
@ -2003,8 +2007,8 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
if (pTask == NULL) {
|
||||
tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
||||
pReq->taskId);
|
||||
tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
|
||||
pMeta->vgId, pReq->taskId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -2012,8 +2016,7 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
|
||||
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) {
|
||||
streamTaskClearCheckInfo(pTask);
|
||||
taosArrayClear(pTask->pReadyMsgList);
|
||||
streamTaskClearCheckInfo(pTask, true);
|
||||
streamTaskSetStatusReady(pTask);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,9 +18,9 @@
|
|||
|
||||
#include "executor.h"
|
||||
#include "query.h"
|
||||
#include "tstream.h"
|
||||
#include "streamBackendRocksdb.h"
|
||||
#include "trpc.h"
|
||||
#include "tstream.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -113,19 +113,23 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
|
|||
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
|
||||
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize);
|
||||
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
||||
int32_t* blockSize);
|
||||
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
|
||||
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
|
||||
const char* streamQueueItemGetTypeStr(int32_t type);
|
||||
|
||||
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
|
||||
|
||||
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen);
|
||||
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer,
|
||||
int32_t* pLen);
|
||||
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
|
||||
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
|
||||
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
|
||||
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
|
||||
|
||||
void streamClearChkptReadyMsg(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
|
||||
STaskId streamTaskExtractKey(const SStreamTask* pTask);
|
||||
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
|
||||
|
|
|
@ -13,9 +13,9 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "streamInt.h"
|
||||
#include "rsync.h"
|
||||
#include "cos.h"
|
||||
#include "rsync.h"
|
||||
#include "streamInt.h"
|
||||
|
||||
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
|
@ -198,7 +198,8 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
|||
// todo fix race condition: set the status and append checkpoint block
|
||||
int32_t taskLevel = pTask->info.taskLevel;
|
||||
if (taskLevel == TASK_LEVEL__SOURCE) {
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH ||
|
||||
pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
|
||||
continueDispatchCheckpointBlock(pBlock, pTask);
|
||||
} else { // only one task exists, no need to dispatch downstream info
|
||||
|
@ -234,7 +235,8 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
|||
} else {
|
||||
stDebug(
|
||||
"s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg "
|
||||
"downstream", id, num);
|
||||
"downstream",
|
||||
id, num);
|
||||
|
||||
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
|
||||
// can start local checkpoint procedure
|
||||
|
@ -272,13 +274,17 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void streamTaskClearCheckInfo(SStreamTask* pTask) {
|
||||
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
|
||||
pTask->checkpointingId = 0; // clear the checkpoint id
|
||||
pTask->chkInfo.failedId = 0;
|
||||
pTask->chkInfo.startTs = 0; // clear the recorded start time
|
||||
pTask->checkpointNotReadyTasks = 0;
|
||||
pTask->checkpointAlignCnt = 0;
|
||||
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
|
||||
|
||||
if (clearChkpReadyMsg) {
|
||||
streamClearChkptReadyMsg(pTask);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||
|
@ -305,7 +311,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
|||
p->chkInfo.checkpointId = p->checkpointingId;
|
||||
p->chkInfo.checkpointVer = p->chkInfo.processedVer;
|
||||
|
||||
streamTaskClearCheckInfo(p);
|
||||
streamTaskClearCheckInfo(p, false);
|
||||
|
||||
char* str = NULL;
|
||||
streamTaskGetStatus(p, &str);
|
||||
|
@ -385,8 +391,7 @@ static int uploadCheckpointToS3(char* id, char* path){
|
|||
TdDirEntryPtr de = NULL;
|
||||
while ((de = taosReadDir(pDir)) != NULL) {
|
||||
char* name = taosGetDirEntryName(de);
|
||||
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 ||
|
||||
taosDirEntryIsDir(de)) continue;
|
||||
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue;
|
||||
|
||||
char filename[PATH_MAX] = {0};
|
||||
if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) {
|
||||
|
|
|
@ -14,9 +14,9 @@
|
|||
*/
|
||||
|
||||
#include "streamInt.h"
|
||||
#include "tmisce.h"
|
||||
#include "trpc.h"
|
||||
#include "ttimer.h"
|
||||
#include "tmisce.h"
|
||||
|
||||
typedef struct SBlockName {
|
||||
uint32_t hashValue;
|
||||
|
@ -343,7 +343,8 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
|||
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
|
||||
|
||||
// TODO: do not use broadcast
|
||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT || pDataBlock->info.type == STREAM_TRANS_STATE) {
|
||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT ||
|
||||
pDataBlock->info.type == STREAM_TRANS_STATE) {
|
||||
for (int32_t j = 0; j < numOfVgroups; j++) {
|
||||
code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]);
|
||||
if (code != 0) {
|
||||
|
@ -393,8 +394,8 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
|
|||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
||||
|
||||
stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d vgroup(s), msgId:%d",
|
||||
id, pTask->info.selfChildId, numOfVgroups, msgId);
|
||||
stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d vgroup(s), msgId:%d", id,
|
||||
pTask->info.selfChildId, numOfVgroups, msgId);
|
||||
|
||||
for (int32_t i = 0; i < numOfVgroups; i++) {
|
||||
if (pDispatchMsg[i].blockNum > 0) {
|
||||
|
@ -409,7 +410,8 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
|
|||
}
|
||||
}
|
||||
|
||||
stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfVgroups, msgId);
|
||||
stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfVgroups,
|
||||
msgId);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -441,8 +443,8 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
|||
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
||||
|
||||
int32_t numOfFailed = taosArrayGetSize(pList);
|
||||
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch blocks to %d vgroup(s), msgId:%d",
|
||||
id, pTask->info.selfChildId, numOfFailed, msgId);
|
||||
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch blocks to %d vgroup(s), msgId:%d", id,
|
||||
pTask->info.selfChildId, numOfFailed, msgId);
|
||||
|
||||
for (int32_t i = 0; i < numOfFailed; i++) {
|
||||
int32_t vgId = *(int32_t*)taosArrayGet(pList, i);
|
||||
|
@ -461,7 +463,8 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
|||
}
|
||||
}
|
||||
|
||||
stDebug("s-task:%s complete re-try shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfFailed, msgId);
|
||||
stDebug("s-task:%s complete re-try shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr,
|
||||
numOfFailed, msgId);
|
||||
} else {
|
||||
int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId;
|
||||
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
|
||||
|
@ -531,10 +534,12 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
|||
}
|
||||
|
||||
if (pDataBlock->info.parTbName[0]) {
|
||||
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
|
||||
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db,
|
||||
pDataBlock->info.parTbName);
|
||||
} else {
|
||||
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
|
||||
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
|
||||
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db,
|
||||
pDataBlock->info.parTbName);
|
||||
}
|
||||
|
||||
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
|
||||
|
@ -576,13 +581,15 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
|||
}
|
||||
|
||||
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
|
||||
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH ||
|
||||
pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
|
||||
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue);
|
||||
if (numOfElems > 0) {
|
||||
double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputq.queue->pQueue));
|
||||
stDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size);
|
||||
stDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id,
|
||||
numOfElems, size);
|
||||
}
|
||||
|
||||
// to make sure only one dispatch is running
|
||||
|
@ -635,7 +642,8 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
|
||||
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
|
||||
stDebug(
|
||||
"s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
|
||||
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||
|
||||
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
|
@ -659,7 +667,8 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
|
|||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
||||
pTask->notReadyTasks = 1;
|
||||
doDispatchScanHistoryFinishMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
|
||||
doDispatchScanHistoryFinishMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId,
|
||||
&pTask->outputInfo.fixedDispatcher.epSet);
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||
|
@ -667,8 +676,8 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
|
|||
|
||||
char* p = NULL;
|
||||
streamTaskGetStatus(pTask, &p);
|
||||
stDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
|
||||
numOfVgs, p);
|
||||
stDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s",
|
||||
pTask->id.idStr, numOfVgs, p);
|
||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||
req.downstreamTaskId = pVgInfo->taskId;
|
||||
|
@ -696,7 +705,8 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
taosArrayClear(pTask->pReadyMsgList);
|
||||
stDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel, num);
|
||||
stDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel,
|
||||
num);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -866,8 +876,8 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo,
|
||||
SStreamTask* pTask, int8_t isSucceed) {
|
||||
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
|
||||
int8_t isSucceed) {
|
||||
SStreamChkptReadyInfo info = {0};
|
||||
buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, isSucceed);
|
||||
|
||||
|
@ -876,7 +886,8 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
|
|||
}
|
||||
|
||||
taosArrayPush(pTask->pReadyMsgList, &info);
|
||||
stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pTask->pReadyMsgList));
|
||||
stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr,
|
||||
(int32_t)taosArrayGetSize(pTask->pReadyMsgList));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -937,6 +948,16 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
|
|||
return 0;
|
||||
}
|
||||
|
||||
void streamClearChkptReadyMsg(SStreamTask* pTask) {
|
||||
if (pTask->pReadyMsgList == NULL) return;
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(pTask->pReadyMsgList); i++) {
|
||||
SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, i);
|
||||
rpcFreeCont(pInfo->msg.pCont);
|
||||
}
|
||||
taosArrayClear(pTask->pReadyMsgList);
|
||||
}
|
||||
|
||||
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
|
@ -959,7 +980,8 @@ int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistory
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen) {
|
||||
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer,
|
||||
int32_t* pLen) {
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
SEncoder encoder;
|
||||
|
@ -1104,7 +1126,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
|
||||
// follower not handle the dispatch rsp
|
||||
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
|
||||
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId);
|
||||
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id,
|
||||
vgId);
|
||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
}
|
||||
|
||||
|
@ -1122,8 +1145,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
// flag. Here we need to retry dispatch this message to downstream task immediately. handle the case the failure
|
||||
// happened too fast.
|
||||
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
|
||||
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", id,
|
||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
|
||||
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already",
|
||||
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
|
||||
} else {
|
||||
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId,
|
||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
|
||||
|
@ -1154,7 +1177,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
ASSERT(leftRsp >= 0);
|
||||
|
||||
if (leftRsp > 0) {
|
||||
stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, waiting for %d rsp",
|
||||
stDebug(
|
||||
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, waiting "
|
||||
"for %d rsp",
|
||||
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), leftRsp);
|
||||
} else {
|
||||
stDebug(
|
||||
|
@ -1162,8 +1187,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
|
||||
}
|
||||
} else {
|
||||
stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s",
|
||||
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
|
||||
stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s", id,
|
||||
msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
|
||||
}
|
||||
|
||||
ASSERT(leftRsp >= 0);
|
||||
|
|
|
@ -15,11 +15,11 @@
|
|||
|
||||
#include "executor.h"
|
||||
#include "streamInt.h"
|
||||
#include "streamsm.h"
|
||||
#include "tmisce.h"
|
||||
#include "tstream.h"
|
||||
#include "ttimer.h"
|
||||
#include "wal.h"
|
||||
#include "streamsm.h"
|
||||
|
||||
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
|
||||
|
||||
|
@ -358,7 +358,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
walCloseReader(pTask->exec.pWalReader);
|
||||
}
|
||||
|
||||
streamClearChkptReadyMsg(pTask);
|
||||
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
|
||||
|
||||
if (pTask->msgInfo.pData != NULL) {
|
||||
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
||||
pTask->msgInfo.pData = NULL;
|
||||
|
@ -442,7 +444,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
|
||||
pTask->outputInfo.pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
|
||||
if (pTask->outputInfo.pTokenBucket == NULL) {
|
||||
stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr,
|
||||
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -529,8 +532,8 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
|
|||
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||
if (pInfo->nodeId == nodeId) {
|
||||
epsetAssign(&pInfo->epSet, pEpSet);
|
||||
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId,
|
||||
pInfo->taskId, nodeId, buf);
|
||||
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId, pInfo->taskId,
|
||||
nodeId, buf);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue