Merge branch '3.0' into fix/PI-23-3.0

This commit is contained in:
kailixu 2023-11-23 16:49:19 +08:00
commit 14f163a795
11 changed files with 346 additions and 271 deletions

View File

@ -34,7 +34,6 @@ extern "C" {
#define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F)
#define SIZE_IN_KiB(_v) ((_v) / ONE_KiB_F)
#define TASK_DOWNSTREAM_READY 0x0
#define TASK_DOWNSTREAM_NOT_READY 0x1
#define TASK_DOWNSTREAM_NOT_LEADER 0x2
@ -759,7 +758,8 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen)
// recover and fill history
void streamTaskCheckDownstream(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, int64_t* oldStage);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
int64_t* oldStage);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
bool streamTaskAllUpstreamClosed(SStreamTask* pTask);
@ -841,7 +841,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pMeta);
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
void streamTaskClearCheckInfo(SStreamTask* pTask);
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg);
int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,

View File

@ -513,6 +513,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
tsNumOfTaskQueueThreads = tsNumOfCores / 2;
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
if (tsNumOfTaskQueueThreads >= 50) {
tsNumOfTaskQueueThreads = 50;
}

View File

@ -84,8 +84,10 @@ typedef struct {
} SVnodeThread;
// vmInt.c
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId);
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId);
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId);
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict);
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal);

View File

@ -282,7 +282,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode != NULL && !pVnode->failed) {
if (pVnode != NULL) {
dError("vgId:%d, already exist", req.vgId);
tFreeSCreateVnodeReq(&req);
vmReleaseVnode(pMgmt, pVnode);
@ -291,10 +291,11 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0;
}
ASSERT(pVnode == NULL || pVnode->failed);
wrapperCfg.diskPrimary = pVnode ? pVnode->diskPrimary : vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
int32_t diskPrimary = wrapperCfg.diskPrimary;
int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
if (diskPrimary < 0) {
diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
}
wrapperCfg.diskPrimary = diskPrimary;
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
@ -371,7 +372,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
TMSG_INFO(pMsg->msgType));
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL || pVnode->failed) {
if (pVnode == NULL) {
dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
@ -489,7 +490,7 @@ int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
req.vgId, TMSG_INFO(pMsg->msgType));
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL || pVnode->failed) {
if (pVnode == NULL) {
dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
@ -532,7 +533,7 @@ int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL || pVnode->failed) {
if (pVnode == NULL) {
dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
@ -565,7 +566,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
req.dstVgId);
pVnode = vmAcquireVnode(pMgmt, srcVgId);
if (pVnode == NULL || pVnode->failed) {
if (pVnode == NULL) {
dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
@ -680,7 +681,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
if (pVnode == NULL || pVnode->failed) {
if (pVnode == NULL) {
dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
@ -748,7 +749,7 @@ int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
if (pVnode == NULL) {
dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;

View File

@ -19,6 +19,19 @@
#include "vnd.h"
#include "libs/function/tudf.h"
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
int32_t diskId = -1;
SVnodeObj *pVnode = NULL;
taosThreadRwlockRdlock(&pMgmt->lock);
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode != NULL) {
diskId = pVnode->diskPrimary;
}
taosThreadRwlockUnlock(&pMgmt->lock);
return diskId;
}
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
STfs *pTfs = pMgmt->pTfs;
int32_t diskId = 0;
@ -74,12 +87,12 @@ int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
return diskId;
}
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
SVnodeObj *pVnode = NULL;
taosThreadRwlockRdlock(&pMgmt->lock);
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL || pVnode->dropped) {
if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
pVnode = NULL;
} else {
@ -91,6 +104,8 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
return pVnode;
}
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVnodeImpl(pMgmt, vgId, true); }
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
if (pVnode == NULL) return;
@ -100,6 +115,15 @@ void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
taosThreadRwlockUnlock(&pMgmt->lock);
}
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
if (!ppVnode || !(*ppVnode)) return;
SVnodeObj *pVnode = *ppVnode;
taosMemoryFree(pVnode->path);
taosMemoryFree(pVnode);
ppVnode[0] = NULL;
}
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
if (pVnode == NULL) {
@ -134,6 +158,12 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
}
taosThreadRwlockWrlock(&pMgmt->lock);
SVnodeObj *pOld = NULL;
taosHashGetDup(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
if (pOld) {
ASSERT(pOld->failed);
vmFreeVnodeObj(&pOld);
}
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
taosThreadRwlockUnlock(&pMgmt->lock);
@ -223,8 +253,7 @@ _closed:
vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs);
}
taosMemoryFree(pVnode->path);
taosMemoryFree(pVnode);
vmFreeVnodeObj(&pVnode);
}
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
@ -621,7 +650,7 @@ static void *vmRestoreVnodeInThread(void *param) {
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
SVnodeObj *pVnode = pThread->ppVnodes[v];
if (pVnode->failed) {
dError("vgId:%d, skip restoring vnode in failure mode.", pVnode->vgId);
dError("vgId:%d, cannot restore a vnode in failed mode.", pVnode->vgId);
continue;
}

View File

@ -187,7 +187,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
pHead->vgId = ntohl(pHead->vgId);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL || pVnode->failed) {
if (pVnode == NULL) {
dGDebug("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg,
terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
terrno = (terrno != 0) ? terrno : -1;
@ -316,7 +316,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
int32_t size = -1;
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
if (pVnode != NULL && !pVnode->failed) {
if (pVnode != NULL) {
switch (qtype) {
case WRITE_QUEUE:
size = taosQueueItemSize(pVnode->pWriteW.queue);

View File

@ -14,8 +14,8 @@
*/
#include "tq.h"
#include "vnd.h"
#include "stream.h"
#include "vnd.h"
typedef struct {
int8_t inited;
@ -83,7 +83,7 @@ void tqDestroyTqHandle(void* data) {
taosMemoryFree(pData->msg);
pData->msg = NULL;
}
if (pData->block != NULL){
if (pData->block != NULL) {
blockDataDestroy(pData->block);
}
}
@ -586,7 +586,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
taosWLockLatch(&pTq->lock);
bool exec = tqIsHandleExec(pHandle);
if(exec){
if (exec) {
tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
pHandle->subKey, pHandle);
taosWUnLockLatch(&pTq->lock);
@ -705,12 +705,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
taosWUnLockLatch(&pTq->lock);
} else {
while(1){
while (1) {
taosWLockLatch(&pTq->lock);
bool exec = tqIsHandleExec(pHandle);
if(exec){
tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p", pTq->pVnode->config.vgId,
pHandle->subKey, pHandle);
if (exec) {
tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p",
pTq->pVnode->config.vgId, pHandle->subKey, pHandle);
taosWUnLockLatch(&pTq->lock);
taosMsleep(10);
continue;
@ -851,11 +851,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
}
// // reset the task status from unfinished transaction
// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
// tqWarn("s-task:%s reset task status to be normal, status kept in taskMeta: Paused", pTask->id.idStr);
// pTask->status.taskStatus = TASK_STATUS__READY;
// }
// // reset the task status from unfinished transaction
// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
// tqWarn("s-task:%s reset task status to be normal, status kept in taskMeta: Paused", pTask->id.idStr);
// pTask->status.taskStatus = TASK_STATUS__READY;
// }
streamTaskResetUpstreamStageInfo(pTask);
streamSetupScheduleTrigger(pTask);
@ -917,7 +917,8 @@ int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
// only the leader node handle the check request
if (pMeta->role == NODE_ROLE_FOLLOWER) {
tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
tqError(
"s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
rsp.status = TASK_DOWNSTREAM_NOT_LEADER;
} else {
@ -928,7 +929,8 @@ int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
char* p = NULL;
streamTaskGetStatus(pTask, &p);
tqDebug("s-task:%s status:%s, stage:%"PRId64" recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d",
tqDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
") task:0x%x (vgId:%d), check_status:%d",
pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else {
rsp.status = TASK_DOWNSTREAM_NOT_READY;
@ -1024,7 +1026,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
streamMetaWUnLock(pStreamMeta);
if (code < 0) {
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code));
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks,
tstrerror(code));
tFreeStreamTask(pTask);
return code;
}
@ -1064,7 +1067,7 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
// if it's an source task, extract the last version in wal.
SVersionRange *pRange = &pTask->dataRange.range;
SVersionRange* pRange = &pTask->dataRange.range;
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
pTask->execInfo.step2Start = taosGetTimestampMs();
@ -1090,7 +1093,7 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
/*int8_t status = */ streamTaskSetSchedStatusInactive(pTask);
// now the fill-history task starts to scan data from wal files.
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
@ -1119,7 +1122,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamTaskGetStatus(pTask, &pStatus);
// avoid multi-thread exec
while(1) {
while (1) {
int32_t sentinel = atomic_val_compare_exchange_32(&pTask->status.inScanHistorySentinel, 0, 1);
if (sentinel != 0) {
tqDebug("s-task:%s already in scan-history func, wait for 100ms, and try again", id);
@ -1198,7 +1201,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// 1. get the related stream task
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) {
tqError("failed to find s-task:0x%"PRIx64", it may have been destroyed, drop related fill-history task:%s",
tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
pTask->streamTaskId.taskId, pTask->id.idStr);
tqDebug("s-task:%s fill-history task set status to be dropping", id);
@ -1474,7 +1477,6 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SINK) {
if (status == TASK_STATUS__UNINIT) {
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
@ -1586,10 +1588,10 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
tqDebug("vgId:%d receive dispatch msg to s-task:0x%" PRIx64 "-0x%x", vgId, req.streamId, taskId);
// for test purpose
// if (req.type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
// code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
// goto FAIL;
// }
// if (req.type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
// code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
// goto FAIL;
// }
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId);
if (pTask != NULL) {
@ -1701,7 +1703,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
pTask->checkpointingId = req.checkpointId;
qError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
", set it failure", pTask->id.idStr, req.checkpointId);
", set it failure",
pTask->id.idStr, req.checkpointId);
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
@ -1922,7 +1925,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
streamMetaWUnLock(pMeta);
} else {
if (!pTq->pVnode->restored) {
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag",
vgId);
pMeta->startInfo.tasksWillRestart = 0;
streamMetaWUnLock(pMeta);
} else {
@ -1938,7 +1942,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
// the following procedure consume many CPU resource, result in the re-election of leader
// with high probability. So we employ it as a test case for the stream processing framework, with
// checkpoint/restart/nodeUpdate etc.
while(1) {
while (1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
if (startVal == 0) {
break;
@ -1989,13 +1993,13 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*) pMsg->pCont;
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
pReq->taskId);
tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, pReq->taskId);
return TSDB_CODE_SUCCESS;
}
@ -2003,8 +2007,7 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) {
streamTaskClearCheckInfo(pTask);
taosArrayClear(pTask->pReadyMsgList);
streamTaskClearCheckInfo(pTask, true);
streamTaskSetStatusReady(pTask);
}

View File

@ -18,9 +18,9 @@
#include "executor.h"
#include "query.h"
#include "tstream.h"
#include "streamBackendRocksdb.h"
#include "trpc.h"
#include "tstream.h"
#ifdef __cplusplus
extern "C" {
@ -114,19 +114,23 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize);
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
int32_t* blockSize);
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
const char* streamQueueItemGetTypeStr(int32_t type);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen);
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer,
int32_t* pLen);
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
void streamClearChkptReadyMsg(SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
STaskId streamTaskExtractKey(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
@ -140,7 +144,7 @@ void* streamQueueNextItem(SStreamQueue* pQueue);
void streamFreeQitem(SStreamQueueItem* data);
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
typedef enum UPLOAD_TYPE{
typedef enum UPLOAD_TYPE {
UPLOAD_DISABLE = -1,
UPLOAD_S3 = 0,
UPLOAD_RSYNC = 1,

View File

@ -13,9 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamInt.h"
#include "rsync.h"
#include "cos.h"
#include "rsync.h"
#include "streamInt.h"
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
@ -122,7 +122,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
pBlock->info.rows = 1;
pBlock->info.childId = pTask->info.selfChildId;
pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock;
pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock;
taosArrayPush(pChkpoint->blocks, pBlock);
taosMemoryFree(pBlock);
@ -196,10 +196,11 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
streamMetaWUnLock(pMeta);
}
//todo fix race condition: set the status and append checkpoint block
// todo fix race condition: set the status and append checkpoint block
int32_t taskLevel = pTask->info.taskLevel;
if (taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH ||
pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
continueDispatchCheckpointBlock(pBlock, pTask);
} else { // only one task exists, no need to dispatch downstream info
@ -235,7 +236,8 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
} else {
stDebug(
"s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg "
"downstream", id, num);
"downstream",
id, num);
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
// can start local checkpoint procedure
@ -273,7 +275,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
return 0;
}
void streamTaskClearCheckInfo(SStreamTask* pTask) {
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
pTask->checkpointingId = 0; // clear the checkpoint id
pTask->chkInfo.failedId = 0;
pTask->chkInfo.startTs = 0; // clear the recorded start time
@ -281,6 +283,9 @@ void streamTaskClearCheckInfo(SStreamTask* pTask) {
pTask->checkpointAlignCnt = 0;
pTask->chkInfo.dispatchCheckpointTrigger = false;
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
if (clearChkpReadyMsg) {
streamClearChkptReadyMsg(pTask);
}
}
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
@ -307,7 +312,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
p->chkInfo.checkpointId = p->checkpointingId;
p->chkInfo.checkpointVer = p->chkInfo.processedVer;
streamTaskClearCheckInfo(p);
streamTaskClearCheckInfo(p, false);
char* str = NULL;
streamTaskGetStatus(p, &str);
@ -380,27 +385,26 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
return code;
}
static int uploadCheckpointToS3(char* id, char* path){
static int uploadCheckpointToS3(char* id, char* path) {
TdDirPtr pDir = taosOpenDir(path);
if (pDir == NULL) return -1;
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 ||
taosDirEntryIsDir(de)) continue;
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue;
char filename[PATH_MAX] = {0};
if(path[strlen(path) - 1] == TD_DIRSEP_CHAR){
if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) {
snprintf(filename, sizeof(filename), "%s%s", path, name);
}else{
} else {
snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
}
char object[PATH_MAX] = {0};
snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
if(s3PutObjectFromFile2(filename, object) != 0){
if (s3PutObjectFromFile2(filename, object) != 0) {
taosCloseDir(&pDir);
return -1;
}
@ -411,59 +415,59 @@ static int uploadCheckpointToS3(char* id, char* path){
return 0;
}
UPLOAD_TYPE getUploadType(){
if(strlen(tsSnodeAddress) != 0){
UPLOAD_TYPE getUploadType() {
if (strlen(tsSnodeAddress) != 0) {
return UPLOAD_RSYNC;
}else if(tsS3StreamEnabled){
} else if (tsS3StreamEnabled) {
return UPLOAD_S3;
}else{
} else {
return UPLOAD_DISABLE;
}
}
int uploadCheckpoint(char* id, char* path){
if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){
int uploadCheckpoint(char* id, char* path) {
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
stError("uploadCheckpoint parameters invalid");
return -1;
}
if(strlen(tsSnodeAddress) != 0){
if (strlen(tsSnodeAddress) != 0) {
return uploadRsync(id, path);
}else if(tsS3StreamEnabled){
} else if (tsS3StreamEnabled) {
return uploadCheckpointToS3(id, path);
}
return 0;
}
int downloadCheckpoint(char* id, char* path){
if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){
int downloadCheckpoint(char* id, char* path) {
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
stError("downloadCheckpoint parameters invalid");
return -1;
}
if(strlen(tsSnodeAddress) != 0){
if (strlen(tsSnodeAddress) != 0) {
return downloadRsync(id, path);
}else if(tsS3StreamEnabled){
} else if (tsS3StreamEnabled) {
return s3GetObjectsByPrefix(id, path);
}
return 0;
}
int deleteCheckpoint(char* id){
if(id == NULL || strlen(id) == 0){
int deleteCheckpoint(char* id) {
if (id == NULL || strlen(id) == 0) {
stError("deleteCheckpoint parameters invalid");
return -1;
}
if(strlen(tsSnodeAddress) != 0){
if (strlen(tsSnodeAddress) != 0) {
return deleteRsync(id);
}else if(tsS3StreamEnabled){
} else if (tsS3StreamEnabled) {
s3DeleteObjectsByPrefix(id);
}
return 0;
}
int deleteCheckpointFile(char* id, char* name){
int deleteCheckpointFile(char* id, char* name) {
char object[128] = {0};
snprintf(object, sizeof(object), "%s/%s", id, name);
char *tmp = object;
char* tmp = object;
s3DeleteObjects((const char**)&tmp, 1);
return 0;
}

View File

@ -14,9 +14,9 @@
*/
#include "streamInt.h"
#include "tmisce.h"
#include "trpc.h"
#include "ttimer.h"
#include "tmisce.h"
typedef struct SBlockName {
uint32_t hashValue;
@ -343,7 +343,8 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
// TODO: do not use broadcast
if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT || pDataBlock->info.type == STREAM_TRANS_STATE) {
if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT ||
pDataBlock->info.type == STREAM_TRANS_STATE) {
for (int32_t j = 0; j < numOfVgroups; j++) {
code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]);
if (code != 0) {
@ -362,7 +363,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
}
code = streamSearchAndAddBlock(pTask, pReqs, pDataBlock, numOfVgroups, pDataBlock->info.id.groupId);
if(code != 0) {
if (code != 0) {
destroyDispatchMsg(pReqs, numOfVgroups);
return code;
}
@ -371,7 +372,8 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
pTask->msgInfo.pData = pReqs;
}
stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64, pTask->id.idStr, pTask->execInfo.dispatch, pTask->pMeta->stage);
stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64, pTask->id.idStr, pTask->execInfo.dispatch,
pTask->pMeta->stage);
return code;
}
@ -393,8 +395,8 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d vgroup(s), msgId:%d",
id, pTask->info.selfChildId, numOfVgroups, msgId);
stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d vgroup(s), msgId:%d", id,
pTask->info.selfChildId, numOfVgroups, msgId);
for (int32_t i = 0; i < numOfVgroups; i++) {
if (pDispatchMsg[i].blockNum > 0) {
@ -409,7 +411,8 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
}
}
stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfVgroups, msgId);
stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfVgroups,
msgId);
}
return code;
@ -434,20 +437,20 @@ static void doRetryDispatchData(void* param, void* tmrId) {
SArray* pList = taosArrayDup(pTask->msgInfo.pRetryList, NULL);
taosArrayClear(pTask->msgInfo.pRetryList);
SStreamDispatchReq *pReq = pTask->msgInfo.pData;
SStreamDispatchReq* pReq = pTask->msgInfo.pData;
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
int32_t numOfFailed = taosArrayGetSize(pList);
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch blocks to %d vgroup(s), msgId:%d",
id, pTask->info.selfChildId, numOfFailed, msgId);
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch blocks to %d vgroup(s), msgId:%d", id,
pTask->info.selfChildId, numOfFailed, msgId);
for (int32_t i = 0; i < numOfFailed; i++) {
int32_t vgId = *(int32_t*) taosArrayGet(pList, i);
int32_t vgId = *(int32_t*)taosArrayGet(pList, i);
for(int32_t j = 0; j < numOfVgroups; ++j) {
for (int32_t j = 0; j < numOfVgroups; ++j) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
if (pVgInfo->vgId == vgId) {
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
@ -461,7 +464,8 @@ static void doRetryDispatchData(void* param, void* tmrId) {
}
}
stDebug("s-task:%s complete re-try shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfFailed, msgId);
stDebug("s-task:%s complete re-try shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr,
numOfFailed, msgId);
} else {
int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId;
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
@ -478,8 +482,8 @@ static void doRetryDispatchData(void* param, void* tmrId) {
if (code != TSDB_CODE_SUCCESS) {
if (!streamTaskShouldStop(pTask)) {
// stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
// atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
// stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
// atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
if (streamTaskShouldPause(pTask)) {
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS * 10);
} else {
@ -531,10 +535,12 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
}
if (pDataBlock->info.parTbName[0]) {
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db,
pDataBlock->info.parTbName);
} else {
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db,
pDataBlock->info.parTbName);
}
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
@ -576,13 +582,15 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
}
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH ||
pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
const char* id = pTask->id.idStr;
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue);
if (numOfElems > 0) {
double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputq.queue->pQueue));
stDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size);
stDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id,
numOfElems, size);
}
// to make sure only one dispatch is running
@ -641,7 +649,8 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
stDebug(
"s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
@ -665,7 +674,8 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
pTask->notReadyTasks = 1;
doDispatchScanHistoryFinishMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
doDispatchScanHistoryFinishMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId,
&pTask->outputInfo.fixedDispatcher.epSet);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
@ -673,8 +683,8 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
char* p = NULL;
streamTaskGetStatus(pTask, &p);
stDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
numOfVgs, p);
stDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s",
pTask->id.idStr, numOfVgs, p);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.downstreamTaskId = pVgInfo->taskId;
@ -702,7 +712,8 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
}
taosArrayClear(pTask->pReadyMsgList);
stDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel, num);
stDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel,
num);
return TSDB_CODE_SUCCESS;
}
@ -872,8 +883,8 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf
return 0;
}
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo,
SStreamTask* pTask, int8_t isSucceed) {
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
int8_t isSucceed) {
SStreamChkptReadyInfo info = {0};
buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, isSucceed);
@ -882,7 +893,8 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
}
taosArrayPush(pTask->pReadyMsgList, &info);
stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pTask->pReadyMsgList));
stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr,
(int32_t)taosArrayGetSize(pTask->pReadyMsgList));
return TSDB_CODE_SUCCESS;
}
@ -932,8 +944,10 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
info.msg.info.noResp = 1; // refactor later.
stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64 ":0x%x (vgId:%d) idx:%d, vgId:%d",
pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.upstreamNodeId, index, req.upstreamNodeId);
stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64
":0x%x (vgId:%d) idx:%d, vgId:%d",
pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.upstreamNodeId, index,
req.upstreamNodeId);
if (pTask->pReadyMsgList == NULL) {
pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));
@ -943,6 +957,16 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
return 0;
}
void streamClearChkptReadyMsg(SStreamTask* pTask) {
if (pTask->pReadyMsgList == NULL) return;
for (int i = 0; i < taosArrayGetSize(pTask->pReadyMsgList); i++) {
SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, i);
rpcFreeCont(pInfo->msg.pCont);
}
taosArrayClear(pTask->pReadyMsgList);
}
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
@ -965,7 +989,8 @@ int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistory
return 0;
}
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen) {
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer,
int32_t* pLen) {
int32_t len = 0;
int32_t code = 0;
SEncoder encoder;
@ -1065,7 +1090,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms",
pTask->id.idStr, downstreamId, el);
} else {
stDebug("s-task:%s dispatch completed, elapsed time:%"PRId64"ms", pTask->id.idStr, el);
stDebug("s-task:%s dispatch completed, elapsed time:%" PRId64 "ms", pTask->id.idStr, el);
}
// now ready for next data output
@ -1095,7 +1120,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// follower not handle the dispatch rsp
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId);
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id,
vgId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
@ -1113,8 +1139,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// flag. Here we need to retry dispatch this message to downstream task immediately. handle the case the failure
// happened too fast.
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
} else {
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
@ -1158,7 +1184,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
ASSERT(leftRsp >= 0);
if (leftRsp > 0) {
stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, waiting for %d rsp",
stDebug(
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, waiting "
"for %d rsp",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), leftRsp);
} else {
stDebug(
@ -1166,8 +1194,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
}
} else {
stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s", id,
msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
}
ASSERT(leftRsp >= 0);

View File

@ -15,11 +15,11 @@
#include "executor.h"
#include "streamInt.h"
#include "streamsm.h"
#include "tmisce.h"
#include "tstream.h"
#include "ttimer.h"
#include "wal.h"
#include "streamsm.h"
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
@ -310,7 +310,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
" nextProcessVer:%" PRId64", checkpointCount:%d",
" nextProcessVer:%" PRId64 ", checkpointCount:%d",
taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs,
pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, pTask->chkInfo.nextProcessVer,
pStatis->checkpoint);
@ -358,7 +358,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
walCloseReader(pTask->exec.pWalReader);
}
streamClearChkptReadyMsg(pTask);
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
if (pTask->msgInfo.pData != NULL) {
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
pTask->msgInfo.pData = NULL;
@ -442,7 +444,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->outputInfo.pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
if (pTask->outputInfo.pTokenBucket == NULL) {
stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr,
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -529,8 +532,8 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
if (pInfo->nodeId == nodeId) {
epsetAssign(&pInfo->epSet, pEpSet);
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId,
pInfo->taskId, nodeId, buf);
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId, pInfo->taskId,
nodeId, buf);
break;
}
}
@ -720,7 +723,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask) {
}
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId) {
SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;