fix(stream): add new status for meta.

This commit is contained in:
Haojun Liao 2023-09-26 12:15:12 +08:00
parent d214a2f6bd
commit f3f18b0fa3
11 changed files with 54 additions and 75 deletions

View File

@ -40,6 +40,10 @@ extern "C" {
#define TASK_DOWNSTREAM_NOT_LEADER 0x2
#define TASK_SELF_NEW_STAGE 0x3
#define NODE_ROLE_UNINIT 0x1
#define NODE_ROLE_LEADER 0x2
#define NODE_ROLE_FOLLOWER 0x3
typedef struct SStreamTask SStreamTask;
#define SSTREAM_TASK_VER 2
@ -411,7 +415,8 @@ typedef struct SStreamMeta {
FTaskExpand* expandFunc;
int32_t vgId;
int64_t stage;
bool leader;
// bool leader;
int32_t role;
STaskStartInfo startInfo;
SRWLatch lock;
int32_t walScanCounter;

View File

@ -225,7 +225,7 @@ int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqScanWalAsync(STQ* pTq, bool ckPause);
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp);
int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq);
@ -249,8 +249,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);

View File

@ -882,7 +882,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
return 0;
}
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
@ -908,7 +908,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
};
// only the leader node handle the check request
if (!pMeta->leader) {
if (pMeta->role == NODE_ROLE_FOLLOWER) {
tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
rsp.status = TASK_DOWNSTREAM_NOT_LEADER;
@ -932,7 +932,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId);
}
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t vgId = pTq->pStreamMeta->vgId;
@ -1736,7 +1736,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs
}
// downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task
int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
@ -1848,8 +1848,10 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
// possibly only handle the stream task.
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
int32_t updateTasks = taosHashGetSize(pMeta->pUpdateTaskSet);
pMeta->startInfo.startedAfterNodeUpdate = 1;
if (updateTasks < numOfTasks) {
pMeta->startInfo.startedAfterNodeUpdate = 1;
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
updateTasks, (numOfTasks - updateTasks));
taosWUnLockLatch(&pMeta->lock);

View File

@ -24,23 +24,22 @@ typedef struct STableSinkInfo {
tstr name;
} STableSinkInfo;
static int32_t tsAscendingSortFn(const void* p1, const void* p2);
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
SSubmitTbData* pTableData);
static int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock,
SSubmitTbData* pTableData);
static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid);
static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
static int32_t tsAscendingSortFn(const void* p1, const void* p2);
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks);
static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
static int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id);
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
const char* dstTableName, int64_t* uid);
static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id);
static SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols,
SSDataBlock* pDataBlock);
static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
static int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id);
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks);
static int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id);
static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
static SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock);
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr) {
@ -255,7 +254,7 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2*
void* pBuf = NULL;
int32_t numOfFinalBlocks = taosArrayGetSize(pReq->aSubmitTbData);
int32_t code = tqBuildSubmitReq(pReq, vgId, &pBuf, &len);
int32_t code = buildSubmitMsgImpl(pReq, vgId, &pBuf, &len);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s build submit msg failed, vgId:%d, code:%s", id, vgId, tstrerror(code));
return code;
@ -274,7 +273,7 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2*
pRec->numOfSubmit += 1;
if ((pRec->numOfSubmit % 5000) == 0) {
double el = (taosGetTimestampMs() - pTask->execInfo.start) / 1000.0;
double el = (taosGetTimestampMs() - pTask->execInfo.start) / 1000.0;
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
" submit into dst table, %.2fMiB duration:%.2f Sec.",
pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->bytes), el);
@ -462,7 +461,7 @@ int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo,
return code;
}
int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) {
int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) {
int32_t code = 0;
void* pBuf = NULL;
*msgLen = 0;

View File

@ -42,7 +42,7 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
int64_t stage = pMeta->stage;
pMeta->stage = state.term;
pMeta->leader = isLeader;
pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER;
if (isLeader) {
tqInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId,
state.term, stage, isLeader);

View File

@ -754,9 +754,9 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
case TDMT_STREAM_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_TASK_CHECK:
return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
return tqProcessTaskCheckReq(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_TASK_CHECK_RSP:
return tqProcessStreamTaskCheckRsp(pVnode->pTq, pMsg);
return tqProcessTaskCheckRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE:
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_RSP:
@ -768,7 +768,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP:
return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_READY:
return tqProcessStreamTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in stream queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;

View File

@ -224,7 +224,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
ASSERT(pInfo != NULL);
if (!pTask->pMeta->leader) {
if (pTask->pMeta->role == NODE_ROLE_FOLLOWER) {
stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id);
status = TASK_INPUT_STATUS__REFUSED;
} else {

View File

@ -1021,18 +1021,19 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo,
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK);
const char* id = pTask->id.idStr;
int32_t level = pTask->info.taskLevel;
int32_t num = taosArrayGetSize(pTask->pRspMsgList);
for (int32_t i = 0; i < num; ++i) {
SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i);
tmsgSendRsp(&pInfo->msg);
stDebug("s-task:%s level:%d notify upstream:0x%x to continue process data in WAL", pTask->id.idStr, pTask->info.taskLevel,
pInfo->taskId);
stDebug("s-task:%s level:%d notify upstream:0x%x continuing scan data in WAL", id, level, pInfo->taskId);
}
taosArrayClear(pTask->pRspMsgList);
stDebug("s-task:%s level:%d continue process msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel,
num);
stDebug("s-task:%s level:%d continue process msg sent to all %d upstreams", id, level, num);
return 0;
}
@ -1063,7 +1064,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int32_t vgId = pTask->pMeta->vgId;
int32_t msgId = pTask->execInfo.dispatch;
if ((!pTask->pMeta->leader) || (pTask->status.downstreamReady != 1)) {
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
@ -1160,10 +1161,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// now ready for next data output
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
return TSDB_CODE_SUCCESS;
} else {
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId);
}
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId);
}
}

View File

@ -195,7 +195,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->chkpId = streamGetLatestCheckpointId(pMeta);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
while (pMeta->streamBackend == NULL) {
taosMsleep(2 * 1000);
taosMsleep(500);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
if (pMeta->streamBackend == NULL) {
stError("vgId:%d failed to init stream backend", pMeta->vgId);
@ -204,6 +204,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
pMeta->role = NODE_ROLE_UNINIT;
code = streamBackendLoadCheckpointInfo(pMeta);
taosInitRWLatch(&pMeta->lock);
@ -236,6 +237,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
pMeta->streamBackendRid = -1;
pMeta->streamBackend = NULL;
pMeta->role = NODE_ROLE_UNINIT;
char* defaultPath = taosMemoryCalloc(1, strlen(pMeta->path) + 128);
sprintf(defaultPath, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
@ -261,14 +263,14 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
while (pMeta->streamBackend == NULL) {
taosMsleep(2 * 1000);
taosMsleep(500);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
if (pMeta->streamBackend == NULL) {
stError("vgId:%d failed to init stream backend", pMeta->vgId);
stInfo("vgId:%d retry to init stream backend", pMeta->vgId);
// return -1;
}
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
streamBackendLoadCheckpointInfo(pMeta);
@ -345,6 +347,7 @@ void streamMetaCloseImpl(void* arg) {
taosMemoryFree(pMeta->path);
taosThreadMutexDestroy(&pMeta->backendMutex);
pMeta->role = NODE_ROLE_UNINIT;
taosMemoryFree(pMeta);
stDebug("end to close stream meta");
}
@ -828,7 +831,7 @@ void metaHbToMnode(void* param, void* tmrId) {
}
// not leader not send msg
if (!pMeta->leader) {
if (pMeta->role == NODE_ROLE_FOLLOWER) {
stInfo("vgId:%d follower not send hb to mnode", pMeta->vgId);
taosReleaseRef(streamMetaId, rid);
pMeta->pHbInfo->hbStart = 0;
@ -846,7 +849,7 @@ void metaHbToMnode(void* param, void* tmrId) {
return;
}
stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, pMeta->leader);
stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER));
SStreamHbMsg hbMsg = {0};
taosRLockLatch(&pMeta->lock);
@ -953,7 +956,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId;
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId,
pMeta->leader, pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
(pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
taosWLockLatch(&pMeta->lock);
@ -972,7 +975,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
taosWUnLockLatch(&pMeta->lock);
// wait for the stream meta hb function stopping
if (pMeta->leader) {
if (pMeta->role == NODE_ROLE_LEADER) {
pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP;
while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) {
taosMsleep(100);
@ -1001,5 +1004,5 @@ void streamMetaStartHb(SStreamMeta* pMeta) {
void streamMetaInitForSnode(SStreamMeta* pMeta) {
pMeta->stage = 0;
pMeta->leader = true;
pMeta->role = NODE_ROLE_LEADER;
}

View File

@ -1000,36 +1000,6 @@ void streamTaskEnablePause(SStreamTask* pTask) {
pTask->status.pauseAllowed = 1;
}
// fix: this function should be removed, it may cause deadlock.
//void streamTaskHalt(SStreamTask* pTask) {
// int8_t status = pTask->status.taskStatus;
// if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
// return;
// }
//
// if (status == TASK_STATUS__HALT) {
// return;
// }
//
// // wait for checkpoint completed
// while(pTask->status.taskStatus == TASK_STATUS__CK) {
// qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", pTask->id.idStr,
// streamGetTaskStatusStr(TASK_STATUS__CK));
// taosMsleep(1000);
// }
//
// // upgrade to halt status
// if (status == TASK_STATUS__PAUSE) {
// stDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
// streamGetTaskStatusStr(TASK_STATUS__PAUSE));
// } else {
// stDebug("s-task:%s halt task", pTask->id.idStr);
// }
//
// pTask->status.keepTaskStatus = status;
// pTask->status.taskStatus = TASK_STATUS__HALT;
//}
void streamTaskResumeFromHalt(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
int8_t status = pTask->status.taskStatus;

View File

@ -554,26 +554,26 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
}
int32_t streamTaskStop(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t vgId = pTask->pMeta->vgId;
int64_t st = taosGetTimestampMs();
const char* id = pTask->id.idStr;
taosThreadMutexLock(&pTask->lock);
if (pTask->status.taskStatus == TASK_STATUS__CK) {
stDebug("s-task:%s in checkpoint will be discarded since task is stopped", pTask->id.idStr);
stDebug("s-task:%s in checkpoint will be discarded since task is stopped", id);
}
pTask->status.taskStatus = TASK_STATUS__STOP;
taosThreadMutexUnlock(&pTask->lock);
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */ !streamTaskIsIdle(pTask)) {
stDebug("s-task:%s level:%d wait for task to be idle, check again in 100ms", id, pTask->info.taskLevel);
stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id,
pTask->info.taskLevel);
taosMsleep(100);
}
int64_t el = taosGetTimestampMs() - st;
stDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pMeta->vgId, pTask->id.idStr, el);
stDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", vgId, id, el);
return 0;
}