Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TS-4243-3.0
This commit is contained in:
commit
b0d9c2632f
|
@ -56,7 +56,6 @@ extern "C" {
|
||||||
#define STREAM_EXEC_T_RESTART_ALL_TASKS (-4)
|
#define STREAM_EXEC_T_RESTART_ALL_TASKS (-4)
|
||||||
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
|
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
|
||||||
#define STREAM_EXEC_T_RESUME_TASK (-6)
|
#define STREAM_EXEC_T_RESUME_TASK (-6)
|
||||||
#define STREAM_EXEC_T_UPDATE_TASK_EPSET (-7)
|
|
||||||
|
|
||||||
typedef struct SStreamTask SStreamTask;
|
typedef struct SStreamTask SStreamTask;
|
||||||
typedef struct SStreamQueue SStreamQueue;
|
typedef struct SStreamQueue SStreamQueue;
|
||||||
|
@ -783,7 +782,7 @@ bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask);
|
||||||
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
|
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
|
||||||
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
|
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
|
||||||
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
|
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
|
||||||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt, bool metaLock);
|
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt);
|
||||||
|
|
||||||
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
||||||
|
|
||||||
|
|
|
@ -58,7 +58,7 @@ int32_t tsNumOfMnodeQueryThreads = 4;
|
||||||
int32_t tsNumOfMnodeFetchThreads = 1;
|
int32_t tsNumOfMnodeFetchThreads = 1;
|
||||||
int32_t tsNumOfMnodeReadThreads = 1;
|
int32_t tsNumOfMnodeReadThreads = 1;
|
||||||
int32_t tsNumOfVnodeQueryThreads = 4;
|
int32_t tsNumOfVnodeQueryThreads = 4;
|
||||||
float tsRatioOfVnodeStreamThreads = 1.5F;
|
float tsRatioOfVnodeStreamThreads = 0.5F;
|
||||||
int32_t tsNumOfVnodeFetchThreads = 4;
|
int32_t tsNumOfVnodeFetchThreads = 4;
|
||||||
int32_t tsNumOfVnodeRsmaThreads = 2;
|
int32_t tsNumOfVnodeRsmaThreads = 2;
|
||||||
int32_t tsNumOfQnodeQueryThreads = 4;
|
int32_t tsNumOfQnodeQueryThreads = 4;
|
||||||
|
|
|
@ -142,8 +142,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
|
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
|
||||||
if (ppHTask == NULL || *ppHTask == NULL) {
|
if (ppHTask == NULL || *ppHTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
|
tqError(
|
||||||
vgId, req.taskId);
|
"vgId:%d failed to acquire fill-history task:0x%x when handling update, may have been dropped already, rel "
|
||||||
|
"stream task:0x%x",
|
||||||
|
vgId, (uint32_t)pTask->hTaskInfo.id.taskId, req.taskId);
|
||||||
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
|
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
|
||||||
} else {
|
} else {
|
||||||
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
|
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
|
||||||
|
@ -612,23 +614,35 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
||||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||||
|
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
STaskId hTaskId = {0};
|
||||||
|
|
||||||
tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
|
tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
streamMetaWLock(pMeta);
|
||||||
if (pTask != NULL) {
|
|
||||||
// drop the related fill-history task firstly
|
STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
|
||||||
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
|
if ((ppTask != NULL) && ((*ppTask) != NULL)) {
|
||||||
|
streamMetaAcquireOneTask(*ppTask);
|
||||||
|
SStreamTask* pTask = *ppTask;
|
||||||
|
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
STaskId* pHTaskId = &pTask->hTaskInfo.id;
|
hTaskId.streamId = pTask->hTaskInfo.id.streamId;
|
||||||
streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId);
|
hTaskId.taskId = pTask->hTaskInfo.id.taskId;
|
||||||
tqDebug("s-task:0x%x vgId:%d drop fill-history task:0x%x firstly", pReq->taskId, vgId,
|
|
||||||
(int32_t)pHTaskId->taskId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt, true);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
|
// drop the related fill-history task firstly
|
||||||
|
if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
|
||||||
|
streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
|
||||||
|
tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
|
||||||
|
}
|
||||||
|
|
||||||
// drop the stream task now
|
// drop the stream task now
|
||||||
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
|
|
|
@ -99,6 +99,7 @@ void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration);
|
||||||
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
|
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
|
||||||
void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups);
|
void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups);
|
||||||
int32_t getNumOfDispatchBranch(SStreamTask* pTask);
|
int32_t getNumOfDispatchBranch(SStreamTask* pTask);
|
||||||
|
void clearBufferedDispatchMsg(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
|
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
|
||||||
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
|
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
|
||||||
|
|
|
@ -315,6 +315,16 @@ int32_t getNumOfDispatchBranch(SStreamTask* pTask) {
|
||||||
: taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
|
: taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void clearBufferedDispatchMsg(SStreamTask* pTask) {
|
||||||
|
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
|
||||||
|
if (pMsgInfo->pData != NULL) {
|
||||||
|
destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask));
|
||||||
|
}
|
||||||
|
|
||||||
|
pMsgInfo->pData = NULL;
|
||||||
|
pMsgInfo->dispatchMsgType = 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) {
|
static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
|
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
|
||||||
|
@ -678,8 +688,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
// todo deal with only partially success dispatch case
|
// todo deal with only partially success dispatch case
|
||||||
atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
|
atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
|
||||||
if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore
|
if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore
|
||||||
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
clearBufferedDispatchMsg(pTask);
|
||||||
pTask->msgInfo.pData = NULL;
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -740,6 +749,8 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
|
||||||
|
|
||||||
int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
|
int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
|
||||||
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
|
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
|
||||||
|
ASSERT(dataStrLen > 0);
|
||||||
|
|
||||||
void* buf = taosMemoryCalloc(1, dataStrLen);
|
void* buf = taosMemoryCalloc(1, dataStrLen);
|
||||||
if (buf == NULL) return -1;
|
if (buf == NULL) return -1;
|
||||||
|
|
||||||
|
@ -936,15 +947,12 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) {
|
||||||
// this message has been sent successfully, let's try next one.
|
// this message has been sent successfully, let's try next one.
|
||||||
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
|
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
|
||||||
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);
|
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);
|
||||||
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
|
||||||
|
|
||||||
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||||
if (delayDispatch) {
|
if (delayDispatch) {
|
||||||
pTask->chkInfo.dispatchCheckpointTrigger = true;
|
pTask->chkInfo.dispatchCheckpointTrigger = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->msgInfo.pData = NULL;
|
clearBufferedDispatchMsg(pTask);
|
||||||
pTask->msgInfo.dispatchMsgType = 0;
|
|
||||||
|
|
||||||
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
|
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
|
||||||
|
|
||||||
|
@ -1084,7 +1092,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
} else { // this message has been sent successfully, let's try next one.
|
} else { // this message has been sent successfully, let's try next one.
|
||||||
pTask->msgInfo.retryCount = 0;
|
pTask->msgInfo.retryCount = 0;
|
||||||
|
|
||||||
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
|
// trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state
|
||||||
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
|
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
|
||||||
stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId);
|
stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId);
|
||||||
ASSERT(pTask->info.fillHistory == 1);
|
ASSERT(pTask->info.fillHistory == 1);
|
||||||
|
@ -1093,6 +1101,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
|
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
|
||||||
}
|
}
|
||||||
|
|
||||||
|
clearBufferedDispatchMsg(pTask);
|
||||||
|
|
||||||
// now ready for next data output
|
// now ready for next data output
|
||||||
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
|
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -412,9 +412,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
|
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
|
||||||
|
|
||||||
if (pTask->msgInfo.pData != NULL) {
|
if (pTask->msgInfo.pData != NULL) {
|
||||||
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
clearBufferedDispatchMsg(pTask);
|
||||||
pTask->msgInfo.pData = NULL;
|
|
||||||
pTask->msgInfo.dispatchMsgType = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||||
|
@ -764,21 +762,13 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool metaLock) {
|
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
|
||||||
if (pTask == NULL) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
|
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
|
||||||
if (pTask->info.fillHistory == 0) {
|
if (pTask->info.fillHistory == 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (metaLock) {
|
|
||||||
streamMetaWLock(pMeta);
|
|
||||||
}
|
|
||||||
|
|
||||||
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
|
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
|
||||||
if (ppStreamTask != NULL) {
|
if (ppStreamTask != NULL) {
|
||||||
stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
|
stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
|
||||||
|
@ -796,10 +786,6 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool
|
||||||
taosThreadMutexUnlock(&(*ppStreamTask)->lock);
|
taosThreadMutexUnlock(&(*ppStreamTask)->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (metaLock) {
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue