Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/ly_mem_leak

This commit is contained in:
54liuyao 2024-08-19 08:54:43 +08:00
commit 1b654c5a9a
30 changed files with 819 additions and 665 deletions

View File

@ -19,7 +19,6 @@
#include "os.h" #include "os.h"
#include "streamMsg.h" #include "streamMsg.h"
#include "streamState.h" #include "streamState.h"
#include "streamMsg.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tdbInt.h" #include "tdbInt.h"
#include "tmsg.h" #include "tmsg.h"
@ -266,14 +265,14 @@ typedef struct SStreamTaskId {
} SStreamTaskId; } SStreamTaskId;
typedef struct SCheckpointInfo { typedef struct SCheckpointInfo {
int64_t startTs; int64_t startTs;
int64_t checkpointId; // latest checkpoint id int64_t checkpointId; // latest checkpoint id
int64_t checkpointVer; // latest checkpoint offset in wal int64_t checkpointVer; // latest checkpoint offset in wal
int64_t checkpointTime; // latest checkpoint time int64_t checkpointTime; // latest checkpoint time
int64_t processedVer; int64_t processedVer;
int64_t nextProcessVer; // current offset in WAL, not serialize it int64_t nextProcessVer; // current offset in WAL, not serialize it
int64_t msgVer; int64_t msgVer;
int32_t consensusTransId;// consensus checkpoint id int32_t consensusTransId; // consensus checkpoint id
SActiveCheckpointInfo* pActiveInfo; SActiveCheckpointInfo* pActiveInfo;
} SCheckpointInfo; } SCheckpointInfo;
@ -455,6 +454,7 @@ struct SStreamTask {
void* pBackend; void* pBackend;
int8_t subtableWithoutMd5; int8_t subtableWithoutMd5;
char reserve[256]; char reserve[256];
char* backendPath;
}; };
typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*); typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*);
@ -591,9 +591,9 @@ typedef struct STaskStatusEntry {
int32_t statusLastDuration; // to record the last duration of current status int32_t statusLastDuration; // to record the last duration of current status
int64_t stage; int64_t stage;
int32_t nodeId; int32_t nodeId;
SVersionRange verRange; // start/end version in WAL, only valid for source task SVersionRange verRange; // start/end version in WAL, only valid for source task
int64_t processedVer; // only valid for source task int64_t processedVer; // only valid for source task
double inputQUsed; // in MiB double inputQUsed; // in MiB
double inputRate; double inputRate;
double procsThroughput; // duration between one element put into input queue and being processed. double procsThroughput; // duration between one element put into input queue and being processed.
double procsTotal; // duration between one element put into input queue and being processed. double procsTotal; // duration between one element put into input queue and being processed.
@ -678,9 +678,9 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa
int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
// check downstream status // check downstream status
void streamTaskStartMonitorCheckRsp(SStreamTask* pTask); void streamTaskStartMonitorCheckRsp(SStreamTask* pTask);
void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id);
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo);
// fill-history task // fill-history task
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
@ -717,8 +717,8 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
bool streamTaskIsSinkTask(const SStreamTask* pTask); bool streamTaskIsSinkTask(const SStreamTask* pTask);
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask); void streamTaskSetRemoveBackendFiles(SStreamTask* pTask);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask); STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask);
// source level // source level
@ -750,6 +750,9 @@ void streamMetaStartHb(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta);
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready); int64_t endTs, bool ready);
int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo);
void streamMetaClearStartInfo(STaskStartInfo* pStartInfo);
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta); int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs); void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs);
@ -770,7 +773,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
bool streamMetaAllTasksReady(const SStreamMeta* pMeta); bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask); int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask);
// timer // timer
int32_t streamTimerGetInstance(tmr_h* pTmr); int32_t streamTimerGetInstance(tmr_h* pTmr);
@ -812,9 +815,9 @@ void streamTaskSendRetrieveRsp(SStreamRetrieveReq* pReq, SRpcMsg* pRsp);
int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp); int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp);
int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask); int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask);
void streamMutexLock(TdThreadMutex *pMutex); void streamMutexLock(TdThreadMutex* pMutex);
void streamMutexUnlock(TdThreadMutex *pMutex); void streamMutexUnlock(TdThreadMutex* pMutex);
void streamMutexDestroy(TdThreadMutex *pMutex); void streamMutexDestroy(TdThreadMutex* pMutex);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -95,6 +95,7 @@ typedef struct {
} SWalCkHead; } SWalCkHead;
#pragma pack(pop) #pragma pack(pop)
typedef void (*stopDnodeFn)();
typedef struct SWal { typedef struct SWal {
// cfg // cfg
SWalCfg cfg; SWalCfg cfg;
@ -117,6 +118,9 @@ typedef struct SWal {
SHashObj *pRefHash; // refId -> SWalRef SHashObj *pRefHash; // refId -> SWalRef
// path // path
char path[WAL_PATH_LEN]; char path[WAL_PATH_LEN];
stopDnodeFn stopDnode;
// reusable write head // reusable write head
SWalCkHead writeHead; SWalCkHead writeHead;
} SWal; } SWal;
@ -152,7 +156,7 @@ typedef struct SWalReader {
} SWalReader; } SWalReader;
// module initialization // module initialization
int32_t walInit(); int32_t walInit(stopDnodeFn stopDnode);
void walCleanUp(); void walCleanUp();
// handle open and ctl // handle open and ctl

View File

@ -209,11 +209,11 @@ function clean_service_on_launchctl() {
} }
function remove_data_and_config() { function remove_data_and_config() {
data_dir=`grep dataDir /etc/taos/taos.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}` data_dir=`grep dataDir /etc/${PREFIX}/${PREFIX}.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}`
if [ X"$data_dir" == X"" ]; then if [ X"$data_dir" == X"" ]; then
data_dir="/var/lib/${PREFIX}" data_dir="/var/lib/${PREFIX}"
fi fi
log_dir=`grep logDir /etc/taos/taos.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}` log_dir=`grep logDir /etc/${PREFIX}/${PREFIX}.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}`
if [ X"$log_dir" == X"" ]; then if [ X"$log_dir" == X"" ]; then
log_dir="/var/log/${PREFIX}" log_dir="/var/log/${PREFIX}"
fi fi

View File

@ -85,7 +85,7 @@ static int32_t mndOpenWrapper(const char *path, SMnodeOpt *opt, SMnode **pMnode)
} }
static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
int32_t code = 0; int32_t code = 0;
if ((code = walInit()) != 0) { if ((code = walInit(pInput->stopDnodeFp)) != 0) {
dError("failed to init wal since %s", tstrerror(code)); dError("failed to init wal since %s", tstrerror(code));
return code; return code;
} }

View File

@ -624,8 +624,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
goto _OVER; goto _OVER;
} }
tmsgReportStartup("vnode-tfs", "initialized"); tmsgReportStartup("vnode-tfs", "initialized");
if ((code = walInit(pInput->stopDnodeFp)) != 0) {
if ((code = walInit()) != 0) {
dError("failed to init wal since %s", tstrerror(code)); dError("failed to init wal since %s", tstrerror(code));
goto _OVER; goto _OVER;
} }
@ -638,7 +637,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
} }
tmsgReportStartup("vnode-sync", "initialized"); tmsgReportStartup("vnode-sync", "initialized");
if ((code = vnodeInit(tsNumOfCommitThreads)) != 0) { if ((code = vnodeInit(tsNumOfCommitThreads, pInput->stopDnodeFp)) != 0) {
dError("failed to init vnode since %s", tstrerror(code)); dError("failed to init vnode since %s", tstrerror(code));
goto _OVER; goto _OVER;
} }

View File

@ -414,6 +414,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
.getVnodeLoadsLiteFp = dmGetVnodeLoadsLite, .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,
.getMnodeLoadsFp = dmGetMnodeLoads, .getMnodeLoadsFp = dmGetMnodeLoads,
.getQnodeLoadsFp = dmGetQnodeLoads, .getQnodeLoadsFp = dmGetQnodeLoads,
.stopDnodeFp = dmStop,
}; };
opt.msgCb = dmGetMsgcb(pWrapper->pDnode); opt.msgCb = dmGetMsgcb(pWrapper->pDnode);

View File

@ -121,6 +121,7 @@ typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo);
typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo); typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo);
typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo); typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo);
typedef int32_t (*ProcessAlterNodeTypeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef int32_t (*ProcessAlterNodeTypeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
typedef void (*StopDnodeFp)();
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
@ -159,6 +160,7 @@ typedef struct {
GetVnodeLoadsFp getVnodeLoadsLiteFp; GetVnodeLoadsFp getVnodeLoadsLiteFp;
GetMnodeLoadsFp getMnodeLoadsFp; GetMnodeLoadsFp getMnodeLoadsFp;
GetQnodeLoadsFp getQnodeLoadsFp; GetQnodeLoadsFp getQnodeLoadsFp;
StopDnodeFp stopDnodeFp;
} SMgmtInputOpt; } SMgmtInputOpt;
typedef struct { typedef struct {

View File

@ -581,7 +581,7 @@ void mndDumpSdb() {
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
tmsgSetDefault(&msgCb); tmsgSetDefault(&msgCb);
(void)walInit(); (void)walInit(NULL);
(void)syncInit(); (void)syncInit();
SMnodeOpt opt = {.msgCb = msgCb}; SMnodeOpt opt = {.msgCb = msgCb};

View File

@ -24,6 +24,7 @@
#include "mndVgroup.h" #include "mndVgroup.h"
#include "osMemory.h" #include "osMemory.h"
#include "parser.h" #include "parser.h"
#include "taoserror.h"
#include "tmisce.h" #include "tmisce.h"
#include "tname.h" #include "tname.h"
@ -879,6 +880,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mndTransDrop(pTrans); mndTransDrop(pTrans);
if (code == 0) {
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
SName dbname = {0}; SName dbname = {0};
code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (code) { if (code) {
@ -3058,4 +3063,4 @@ _err:
mDebug("create drop %d orphan tasks trans succ", numOfTasks); mDebug("create drop %d orphan tasks trans succ", numOfTasks);
} }
return code; return code;
} }

View File

@ -26,12 +26,13 @@ static int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode);
static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList); static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList);
static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId); static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId);
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage); static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage);
static void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo); static void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo);
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info); static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info);
static bool validateHbMsg(const SArray *pNodeList, int32_t vgId); static bool validateHbMsg(const SArray *pNodeList, int32_t vgId);
static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks); static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks);
static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId); static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId);
static void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks);
void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
@ -52,7 +53,7 @@ void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
} }
} }
void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) { void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
int32_t num = taosArrayGetSize(pList); int32_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SFailedCheckpointInfo *p = taosArrayGet(pList, i); SFailedCheckpointInfo *p = taosArrayGet(pList, i);
@ -401,13 +402,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
if (pTaskEntry == NULL) { if (pTaskEntry == NULL) {
mError("s-task:0x%" PRIx64 " not found in mnode task list, added into orphan task list", p->id.taskId); checkforOrphanTask(pMnode, p, pOrphanTasks);
SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId};
void* px = taosArrayPush(pOrphanTasks, &oTask);
if (px == NULL) {
mError("failed to put task into list, taskId:0x%" PRIx64, p->id.taskId);
}
continue; continue;
} }
@ -423,7 +418,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
code = mndGetStreamObj(pMnode, p->id.streamId, &pStream); code = mndGetStreamObj(pMnode, p->id.streamId, &pStream);
if (code) { if (code) {
mError("stream obj not exist, failed to handle consensus checkpoint-info req, code:%s", tstrerror(code)); mError("stream:0x%" PRIx64 " not exist, failed to handle consensus checkpoint-info req for task:0x%x, code:%s",
p->id.streamId, (int32_t)p->id.taskId, tstrerror(code));
continue; continue;
} }
@ -434,7 +430,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (code == 0) { if (code == 0) {
mndAddConsensusTasks(pInfo, &cp); mndAddConsensusTasks(pInfo, &cp);
} else { } else {
mError("failed to get consensus checkpoint-info"); mError("failed to get consensus checkpoint-info for stream:0x%" PRIx64, p->id.streamId);
} }
mndReleaseStream(pMnode, pStream); mndReleaseStream(pMnode, pStream);
@ -454,7 +450,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SFailedCheckpointInfo info = { SFailedCheckpointInfo info = {
.transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId}; .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId};
addIntoCheckpointList(pFailedChkpt, &info); addIntoFailedChkptList(pFailedChkpt, &info);
// remove failed trans from pChkptStreams // remove failed trans from pChkptStreams
code = mndResetChkptReportInfo(execInfo.pChkptStreams, p->id.streamId); code = mndResetChkptReportInfo(execInfo.pChkptStreams, p->id.streamId);
@ -516,6 +512,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (pMnode != NULL) { // make sure that the unit test case can work if (pMnode != NULL) { // make sure that the unit test case can work
code = mndStreamSendUpdateChkptInfoMsg(pMnode); code = mndStreamSendUpdateChkptInfoMsg(pMnode);
if (code) {
mError("failed to send update checkpointInfo msg, code:%s, try next time", tstrerror(code));
}
} }
streamMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
@ -554,3 +553,26 @@ void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
pRpcInfo->handle = NULL; // disable auto rsp pRpcInfo->handle = NULL; // disable auto rsp
} }
void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks) {
SStreamObj *pStream = NULL;
int32_t code = mndGetStreamObj(pMnode, p->id.streamId, &pStream);
if (code) {
mError("stream:0x%" PRIx64 " not exists, s-task:0x%" PRIx64 " not found in task list, add into orphan list",
p->id.streamId, p->id.taskId);
SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId};
void *px = taosArrayPush(pOrphanTasks, &oTask);
if (px == NULL) {
mError("failed to put task into orphan list, taskId:0x%" PRIx64", code:%s", p->id.taskId, tstrerror(terrno));
}
} else {
if (pStream != NULL) {
mndReleaseStream(pMnode, pStream);
}
mError("s-task:0x%" PRIx64 " not found in task list but exists in mnode meta, data inconsistent, not drop yet",
p->id.taskId);
}
}

View File

@ -49,7 +49,9 @@ typedef struct SVSnapWriter SVSnapWriter;
extern const SVnodeCfg vnodeCfgDefault; extern const SVnodeCfg vnodeCfgDefault;
int32_t vnodeInit(int32_t nthreads); typedef void (*StopDnodeFp)();
int32_t vnodeInit(int32_t nthreads, StopDnodeFp stopDnodeFp);
void vnodeCleanup(); void vnodeCleanup();
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs); int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs);
bool vnodeShouldRemoveWal(SVnode *pVnode); bool vnodeShouldRemoveWal(SVnode *pVnode);

View File

@ -14,7 +14,10 @@
*/ */
#include "tq.h" #include "tq.h"
#include "osDef.h"
#include "taoserror.h"
#include "tqCommon.h" #include "tqCommon.h"
#include "tstream.h"
#include "vnd.h" #include "vnd.h"
// 0: not init // 0: not init
@ -153,7 +156,7 @@ void tqNotifyClose(STQ* pTq) {
} }
void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
int32_t code = 0; int32_t code = 0;
SMqPollReq req = {0}; SMqPollReq req = {0};
code = tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req); code = tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req);
if (code < 0) { if (code < 0) {
@ -169,7 +172,7 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
} }
dataRsp.common.blockNum = 0; dataRsp.common.blockNum = 0;
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
(void) tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset); (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset);
tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf,
req.reqId); req.reqId);
@ -180,15 +183,15 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
tDeleteMqDataRsp(&dataRsp); tDeleteMqDataRsp(&dataRsp);
} }
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const void* pRsp, int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const void* pRsp, int32_t type,
int32_t type, int32_t vgId) { int32_t vgId) {
int64_t sver = 0, ever = 0; int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
char buf1[TSDB_OFFSET_LEN] = {0}; char buf1[TSDB_OFFSET_LEN] = {0};
char buf2[TSDB_OFFSET_LEN] = {0}; char buf2[TSDB_OFFSET_LEN] = {0};
(void) tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset); (void)tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset);
(void) tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset); (void)tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset);
tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
vgId, pReq->consumerId, pReq->epoch, ((SMqDataRspCommon*)pRsp)->blockNum, buf1, buf2, pReq->reqId); vgId, pReq->consumerId, pReq->epoch, ((SMqDataRspCommon*)pRsp)->blockNum, buf1, buf2, pReq->reqId);
@ -200,7 +203,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
SMqVgOffset vgOffset = {0}; SMqVgOffset vgOffset = {0};
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
int32_t code = 0; int32_t code = 0;
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen); tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) { if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
@ -233,12 +236,13 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
} }
// save the new offset value // save the new offset value
if (taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset))){ if (taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset))) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), msg, msgLen - sizeof(vgOffset.consumerId)) < 0) { if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), msg,
msgLen - sizeof(vgOffset.consumerId)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
@ -416,7 +420,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
(void) tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset); (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
@ -447,7 +451,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
STqOffset* pSavedOffset = NULL; STqOffset* pSavedOffset = NULL;
int32_t code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset); int32_t code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset);
if (code != 0) { if (code != 0) {
return TSDB_CODE_TMQ_NO_COMMITTED; return TSDB_CODE_TMQ_NO_COMMITTED;
} }
@ -479,7 +483,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
} }
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t code = 0; int32_t code = 0;
SMqPollReq req = {0}; SMqPollReq req = {0};
if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) { if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen); tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
@ -505,7 +509,6 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
consumerId, vgId, req.subKey, pHandle->consumerId); consumerId, vgId, req.subKey, pHandle->consumerId);
taosRUnLockLatch(&pTq->lock); taosRUnLockLatch(&pTq->lock);
return TSDB_CODE_TMQ_CONSUMER_MISMATCH; return TSDB_CODE_TMQ_CONSUMER_MISMATCH;
} }
int64_t sver = 0, ever = 0; int64_t sver = 0, ever = 0;
@ -612,8 +615,8 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
STqCheckInfo info = {0}; STqCheckInfo info = {0};
int32_t code = tqMetaDecodeCheckInfo(&info, msg, msgLen); int32_t code = tqMetaDecodeCheckInfo(&info, msg, msgLen);
if(code != 0){ if (code != 0) {
return code; return code;
} }
@ -650,7 +653,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
taosRLockLatch(&pTq->lock); taosRLockLatch(&pTq->lock);
STqHandle* pHandle = NULL; STqHandle* pHandle = NULL;
(void)tqMetaGetHandle(pTq, req.subKey, &pHandle); //ignore return code (void)tqMetaGetHandle(pTq, req.subKey, &pHandle); // ignore return code
taosRUnLockLatch(&pTq->lock); taosRUnLockLatch(&pTq->lock);
if (pHandle == NULL) { if (pHandle == NULL) {
if (req.oldConsumerId != -1) { if (req.oldConsumerId != -1) {
@ -697,7 +700,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
} }
} }
end: end:
tDecoderClear(&dc); tDecoderClear(&dc);
return ret; return ret;
} }
@ -705,7 +708,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) { int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) {
STQ* pTq = (STQ*) pTqObj; STQ* pTq = (STQ*)pTqObj;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("s-task:0x%x start to build task", pTask->id.taskId); tqDebug("s-task:0x%x start to build task", pTask->id.taskId);
@ -749,12 +752,12 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
} }
streamTaskResetUpstreamStageInfo(pTask); streamTaskResetUpstreamStageInfo(pTask);
(void) streamSetupScheduleTrigger(pTask); (void)streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo; SCheckpointInfo* pChkInfo = &pTask->chkInfo;
tqSetRestoreVersionInfo(pTask); tqSetRestoreVersionInfo(pTask);
char* p = streamTaskGetStatus(pTask).name; char* p = streamTaskGetStatus(pTask).name;
const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus); const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus);
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
@ -766,14 +769,13 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
(int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam, nextProcessVer); (int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam, nextProcessVer);
} else { } else {
tqInfo( tqInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
"vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64
" nextProcessVer:%" PRId64 " child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x "
" child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x delaySched:%" PRId64 "delaySched:%" PRId64 " ms, inputVer:%" PRId64,
" ms, inputVer:%" PRId64, vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer);
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer);
ASSERT(pChkInfo->checkpointVer <= pChkInfo->nextProcessVer); ASSERT(pChkInfo->checkpointVer <= pChkInfo->nextProcessVer);
} }
@ -781,8 +783,7 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
return 0; return 0;
} }
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); }
return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); }
int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); return tqStreamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
@ -803,13 +804,13 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
pTask->execInfo.step2Start = taosGetTimestampMs(); pTask->execInfo.step2Start = taosGetTimestampMs();
if (done) { if (done) {
qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pStep2Range->minVer, qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id,
pStep2Range->maxVer, 0.0); pStep2Range->minVer, pStep2Range->maxVer, 0.0);
int32_t code = streamTaskPutTranstateIntoInputQ(pTask); // todo: msg lost. int32_t code = streamTaskPutTranstateIntoInputQ(pTask); // todo: msg lost.
if (code) { if (code) {
qError("s-task:%s failed put trans-state into inputQ, code:%s", id, tstrerror(code)); qError("s-task:%s failed put trans-state into inputQ, code:%s", id, tstrerror(code));
} }
(void) streamExecTask(pTask); // exec directly (void)streamExecTask(pTask); // exec directly
} else { } else {
STimeWindow* pWindow = &pTask->dataRange.window; STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64
@ -830,12 +831,12 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE); pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE);
(void) streamTaskSetSchedStatusInactive(pTask); (void)streamTaskSetSchedStatusInactive(pTask);
// now the fill-history task starts to scan data from wal files. // now the fill-history task starts to scan data from wal files.
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
(void) tqScanWalAsync(pTq, false); (void)tqScanWalAsync(pTq, false);
} }
} }
} }
@ -846,9 +847,9 @@ int32_t handleStep2Async(SStreamTask* pStreamTask, void* param) {
SStreamMeta* pMeta = pStreamTask->pMeta; SStreamMeta* pMeta = pStreamTask->pMeta;
STaskId hId = pStreamTask->hTaskInfo.id; STaskId hId = pStreamTask->hTaskInfo.id;
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId, &pTask); int32_t code = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId, &pTask);
if (pTask == NULL) { if (pTask == NULL) {
tqWarn("s-task:0x%x failed to acquired it to exec step 2, scan wal quit", (int32_t) hId.taskId); tqWarn("s-task:0x%x failed to acquired it to exec step 2, scan wal quit", (int32_t)hId.taskId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -930,8 +931,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (retInfo.ret == TASK_SCANHISTORY_REXEC) { if (retInfo.ret == TASK_SCANHISTORY_REXEC) {
streamExecScanHistoryInFuture(pTask, retInfo.idleTime); streamExecScanHistoryInFuture(pTask, retInfo.idleTime);
} else { } else {
SStreamTaskState p = streamTaskGetStatus(pTask); SStreamTaskState p = streamTaskGetStatus(pTask);
ETaskStatus s = p.state; ETaskStatus s = p.state;
if (s == TASK_STATUS__PAUSE) { if (s == TASK_STATUS__PAUSE) {
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr, tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr,
@ -963,7 +964,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
atomic_store_32(&pTask->status.inScanHistorySentinel, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return code; // todo: handle failure return code; // todo: handle failure
} }
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
@ -988,7 +989,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
// let's continue scan data in the wal files // let's continue scan data in the wal files
if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) { if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) {
(void) tqScanWalAsync(pTq, false); // it's ok to failed (void)tqScanWalAsync(pTq, false); // it's ok to failed
} }
return code; return code;
@ -1026,11 +1027,9 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessRetrieveReq(pTq->pStreamMeta, pMsg); return tqStreamTaskProcessRetrieveReq(pTq->pStreamMeta, pMsg);
} }
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; }
return 0;
}
int32_t tqStreamProgressRetrieveReq(STQ *pTq, SRpcMsg *pMsg) { int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont; char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
@ -1092,18 +1091,18 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (!vnodeIsRoleLeader(pTq->pVnode)) { if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode
} }
if (!pTq->pVnode->restored) { if (!pTq->pVnode->restored) {
@ -1111,9 +1110,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
", transId:%d s-task:0x%x ignore it", ", transId:%d s-task:0x%x ignore it",
vgId, req.checkpointId, req.transId, req.taskId); vgId, req.checkpointId, req.transId, req.taskId);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode
} }
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
@ -1123,7 +1122,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
" transId:%d it may have been destroyed", " transId:%d it may have been destroyed",
vgId, req.taskId, req.checkpointId, req.transId); vgId, req.taskId, req.checkpointId, req.transId);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1136,9 +1135,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; // todo retry handle error return TSDB_CODE_SUCCESS; // todo retry handle error
} }
// todo save the checkpoint failed info // todo save the checkpoint failed info
@ -1154,14 +1153,14 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else { } else {
if (status != TASK_STATUS__HALT) { if (status != TASK_STATUS__HALT) {
tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr); tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr);
// streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); // streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
} }
} }
@ -1178,16 +1177,17 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // checkpoint already finished, and not in checkpoint status } else { // checkpoint already finished, and not in checkpoint status
if (req.checkpointId <= pTask->chkInfo.checkpointId) { if (req.checkpointId <= pTask->chkInfo.checkpointId) {
tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
" transId:%d already handled, return success", pTask->id.idStr, req.checkpointId, req.transId); " transId:%d already handled, return success",
pTask->id.idStr, req.checkpointId, req.transId);
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1198,7 +1198,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
if (code) { if (code) {
qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId, tstrerror(code)); qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId,
tstrerror(code));
return code; return code;
} }
@ -1215,7 +1216,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1228,7 +1229,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg->pCont; SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont;
if (!vnodeIsRoleLeader(pTq->pVnode)) { if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId, tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId,
(int32_t)pReq->downstreamTaskId); (int32_t)pReq->downstreamTaskId);
@ -1249,7 +1250,7 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg->pCont; SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont;
if (!vnodeIsRoleLeader(pTq->pVnode)) { if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId, tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId,
(int32_t)pReq->downstreamTaskId); (int32_t)pReq->downstreamTaskId);
@ -1264,9 +1265,7 @@ int32_t tqProcessTaskRetrieveTriggerRsp(STQ* pTq, SRpcMsg* pMsg) {
} }
// this function is needed, do not try to remove it. // this function is needed, do not try to remove it.
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); }
return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg);
}
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg); return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg);

View File

@ -131,27 +131,6 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream
return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK); return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK);
} }
int32_t tqStreamTaskRestoreCheckpoint(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
return 0;
}
tqDebug("vgId:%d restore task:0x%" PRIx64 "-0x%x checkpointId", vgId, streamId, taskId);
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
if (pTask == NULL) {
tqError("failed to acquire task:0x%x when trying to restore checkpointId", taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
code = streamTaskSendRestoreChkptMsg(pTask);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
// this is to process request from transaction, always return true. // this is to process request from transaction, always return true.
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;

View File

@ -157,41 +157,35 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) {
int64_t numRecord = 0; int64_t numRecord = 0;
SMetaInfo info; SMetaInfo info;
if (committer->tsdb->imem->nDel == 0) { // if no history data and no new timestamp data, skip tomb data
goto _exit; if (committer->ctx->info->fset || committer->ctx->hasTSData) {
} committer->ctx->tbid->suid = 0;
committer->ctx->tbid->uid = 0;
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) {
if (record->uid != committer->ctx->tbid->uid) {
committer->ctx->tbid->suid = record->suid;
committer->ctx->tbid->uid = record->uid;
// do not need to write tomb data if there is no ts data if (metaGetInfo(committer->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) {
bool skip = (committer->ctx->info->fset == NULL && !committer->ctx->hasTSData); TAOS_CHECK_GOTO(tsdbIterMergerSkipTableData(committer->tombIterMerger, committer->ctx->tbid), &lino, _exit);
continue;
committer->ctx->tbid->suid = 0; }
committer->ctx->tbid->uid = 0;
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) {
if (record->uid != committer->ctx->tbid->uid) {
committer->ctx->tbid->suid = record->suid;
committer->ctx->tbid->uid = record->uid;
if (metaGetInfo(committer->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) {
TAOS_CHECK_GOTO(tsdbIterMergerSkipTableData(committer->tombIterMerger, committer->ctx->tbid), &lino, _exit);
continue;
} }
}
if (record->ekey < committer->ctx->minKey) { if (record->ekey < committer->ctx->minKey) {
// do nothing // do nothing
} else if (record->skey > committer->ctx->maxKey) { } else if (record->skey > committer->ctx->maxKey) {
// committer->ctx->nextKey = TMIN(record->skey, committer->ctx->nextKey); // committer->ctx->nextKey = TMIN(record->skey, committer->ctx->nextKey);
} else { } else {
record->skey = TMAX(record->skey, committer->ctx->minKey); record->skey = TMAX(record->skey, committer->ctx->minKey);
record->ekey = TMIN(record->ekey, committer->ctx->maxKey); record->ekey = TMIN(record->ekey, committer->ctx->maxKey);
if (!skip) {
numRecord++; numRecord++;
TAOS_CHECK_GOTO(tsdbFSetWriteTombRecord(committer->writer, record), &lino, _exit); TAOS_CHECK_GOTO(tsdbFSetWriteTombRecord(committer->writer, record), &lino, _exit);
} }
}
TAOS_CHECK_GOTO(tsdbIterMergerNext(committer->tombIterMerger), &lino, _exit); TAOS_CHECK_GOTO(tsdbIterMergerNext(committer->tombIterMerger), &lino, _exit);
}
} }
_exit: _exit:

View File

@ -146,10 +146,14 @@ static void updateBlockLoadSlot(SSttBlockLoadInfo *pLoadInfo) {
pLoadInfo->currentLoadBlockIndex = nextSlotIndex; pLoadInfo->currentLoadBlockIndex = nextSlotIndex;
} }
static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { static int32_t loadLastBlock(SLDataIter *pIter, const char *idStr, SBlockData **pResBlock) {
int32_t code = 0; if (pResBlock != NULL) {
*pResBlock = NULL;
}
int32_t code = 0;
SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo; SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) { if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) {
if (pInfo->currentLoadBlockIndex != 0) { if (pInfo->currentLoadBlockIndex != 0) {
tsdbDebug("current load index is set to 0, block index:%d, fileVer:%" PRId64 ", due to uid:%" PRIu64 tsdbDebug("current load index is set to 0, block index:%d, fileVer:%" PRId64 ", due to uid:%" PRIu64
@ -157,7 +161,9 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
pIter->iSttBlk, pIter->cid, pIter->uid, idStr); pIter->iSttBlk, pIter->cid, pIter->uid, idStr);
pInfo->currentLoadBlockIndex = 0; pInfo->currentLoadBlockIndex = 0;
} }
return &pInfo->blockData[0].data;
*pResBlock = &pInfo->blockData[0].data;
return code;
} }
if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) { if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) {
@ -167,11 +173,13 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
pIter->iSttBlk, pIter->cid, pIter->uid, idStr); pIter->iSttBlk, pIter->cid, pIter->uid, idStr);
pInfo->currentLoadBlockIndex = 1; pInfo->currentLoadBlockIndex = 1;
} }
return &pInfo->blockData[1].data;
*pResBlock = &pInfo->blockData[1].data;
return code;
} }
if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) { if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) {
return NULL; return code;
} }
updateBlockLoadSlot(pInfo); updateBlockLoadSlot(pInfo);
@ -181,7 +189,7 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
code = tsdbSttFileReadBlockDataByColumn(pIter->pReader, pIter->pSttBlk, pBlock, pInfo->pSchema, &pInfo->colIds[1], code = tsdbSttFileReadBlockDataByColumn(pIter->pReader, pIter->pSttBlk, pBlock, pInfo->pSchema, &pInfo->colIds[1],
pInfo->numOfCols - 1); pInfo->numOfCols - 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _exit; return code;
} }
double el = (taosGetTimestampUs() - st) / 1000.0; double el = (taosGetTimestampUs() - st) / 1000.0;
@ -200,14 +208,9 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockData[0].sttBlockIndex, tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockData[0].sttBlockIndex,
pInfo->blockData[1].sttBlockIndex, pIter->iRow, idStr); pInfo->blockData[1].sttBlockIndex, pIter->iRow, idStr);
return &pInfo->blockData[pInfo->currentLoadBlockIndex].data;
_exit: *pResBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex].data;
if (code != TSDB_CODE_SUCCESS) { return code;
terrno = code;
}
return NULL;
} }
// find the earliest block that contains the required records // find the earliest block that contains the required records
@ -735,12 +738,17 @@ void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
} }
} }
static void findNextValidRow(SLDataIter *pIter, const char *idStr) { static int32_t findNextValidRow(SLDataIter *pIter, const char *idStr) {
bool hasVal = false; bool hasVal = false;
int32_t step = pIter->backward ? -1 : 1; int32_t step = pIter->backward ? -1 : 1;
int32_t i = pIter->iRow; int32_t i = pIter->iRow;
SBlockData *pData = NULL;
SBlockData *pData = loadLastBlock(pIter, idStr); int32_t code = loadLastBlock(pIter, idStr, &pData);
if (code) {
tsdbError("failed to load stt block, code:%s, %s", tstrerror(code), idStr);
return code;
}
// mostly we only need to find the start position for a given table // mostly we only need to find the start position for a given table
if ((((i == 0) && (!pIter->backward)) || (i == pData->nRow - 1 && pIter->backward)) && pData->aUid != NULL) { if ((((i == 0) && (!pIter->backward)) || (i == pData->nRow - 1 && pIter->backward)) && pData->aUid != NULL) {
@ -748,7 +756,7 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
if (i == -1) { if (i == -1) {
tsdbDebug("failed to find the data in pBlockData, uid:%" PRIu64 " , %s", pIter->uid, idStr); tsdbDebug("failed to find the data in pBlockData, uid:%" PRIu64 " , %s", pIter->uid, idStr);
pIter->iRow = -1; pIter->iRow = -1;
return; return code;
} }
} }
@ -817,20 +825,26 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
} }
pIter->iRow = (hasVal) ? i : -1; pIter->iRow = (hasVal) ? i : -1;
return code;
} }
bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { int32_t tLDataIterNextRow(SLDataIter *pIter, const char *idStr, bool* hasNext) {
int32_t step = pIter->backward ? -1 : 1; int32_t step = pIter->backward ? -1 : 1;
terrno = TSDB_CODE_SUCCESS; int32_t code = 0;
int32_t iBlockL = pIter->iSttBlk;
SBlockData *pBlockData = NULL;
int32_t lino = 0;
*hasNext = false;
terrno = 0;
// no qualified last file block in current file, no need to fetch row // no qualified last file block in current file, no need to fetch row
if (pIter->pSttBlk == NULL) { if (pIter->pSttBlk == NULL) {
return false; return code;
} }
int32_t iBlockL = pIter->iSttBlk; code = loadLastBlock(pIter, idStr, &pBlockData);
SBlockData *pBlockData = loadLastBlock(pIter, idStr); if (pBlockData == NULL || code != TSDB_CODE_SUCCESS) {
if (pBlockData == NULL || terrno != TSDB_CODE_SUCCESS) {
goto _exit; goto _exit;
} }
@ -838,7 +852,8 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
while (1) { while (1) {
bool skipBlock = false; bool skipBlock = false;
findNextValidRow(pIter, idStr); code = findNextValidRow(pIter, idStr);
TSDB_CHECK_CODE(code, lino, _exit);
if (pIter->pBlockLoadInfo->checkRemainingRow) { if (pIter->pBlockLoadInfo->checkRemainingRow) {
skipBlock = true; skipBlock = true;
@ -873,8 +888,8 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
} }
if (iBlockL != pIter->iSttBlk) { if (iBlockL != pIter->iSttBlk) {
pBlockData = loadLastBlock(pIter, idStr); code = loadLastBlock(pIter, idStr, &pBlockData);
if (pBlockData == NULL) { if ((pBlockData == NULL) || (code != 0)) {
goto _exit; goto _exit;
} }
@ -888,7 +903,8 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow); pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
_exit: _exit:
return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL); *hasNext = (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
return code;
} }
// SMergeTree ================================================= // SMergeTree =================================================
@ -991,7 +1007,12 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
goto _end; goto _end;
} }
bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr); bool hasVal = NULL;
code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal);
if (code) {
goto _end;
}
if (hasVal) { if (hasVal) {
tMergeTreeAddIter(pMTree, pIter); tMergeTreeAddIter(pMTree, pIter);
@ -1004,7 +1025,6 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
pSttDataInfo->numOfRows += numOfRows; pSttDataInfo->numOfRows += numOfRows;
} }
} else { } else {
TAOS_CHECK_GOTO(terrno, NULL, _end);
if (!pMTree->ignoreEarlierTs) { if (!pMTree->ignoreEarlierTs) {
pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs; pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
} }
@ -1086,8 +1106,9 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
if (pMTree->pIter) { if (pMTree->pIter) {
SLDataIter *pIter = pMTree->pIter; SLDataIter *pIter = pMTree->pIter;
bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr); bool hasVal = false;
if (!hasVal) { int32_t code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal);
if (!hasVal || (code != 0)) {
pMTree->pIter = NULL; pMTree->pIter = NULL;
} }

View File

@ -3598,6 +3598,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
if (pBlockIter->numOfBlocks == 0) { if (pBlockIter->numOfBlocks == 0) {
// let's try to extract data from stt files. // let's try to extract data from stt files.
terrno = 0;
ERetrieveType type = doReadDataFromSttFiles(pReader); ERetrieveType type = doReadDataFromSttFiles(pReader);
if (type == TSDB_READ_RETURN) { if (type == TSDB_READ_RETURN) {
return terrno; return terrno;

View File

@ -18,13 +18,13 @@
static volatile int32_t VINIT = 0; static volatile int32_t VINIT = 0;
int vnodeInit(int nthreads) { int vnodeInit(int nthreads, StopDnodeFp stopDnodeFp) {
if (atomic_val_compare_exchange_32(&VINIT, 0, 1)) { if (atomic_val_compare_exchange_32(&VINIT, 0, 1)) {
return 0; return 0;
} }
TAOS_CHECK_RETURN(vnodeAsyncOpen(nthreads)); TAOS_CHECK_RETURN(vnodeAsyncOpen(nthreads));
TAOS_CHECK_RETURN(walInit()); TAOS_CHECK_RETURN(walInit(stopDnodeFp));
return 0; return 0;
} }

View File

@ -856,8 +856,6 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* de
extern void doDestroyExchangeOperatorInfo(void* param); extern void doDestroyExchangeOperatorInfo(void* param);
int32_t doFilterImpl(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo,
SColumnInfoData** pResCol);
int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock, int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
int32_t rows, SExecTaskInfo* pTask, STableMetaCacheInfo* pCache); int32_t rows, SExecTaskInfo* pTask, STableMetaCacheInfo* pCache);

View File

@ -279,21 +279,20 @@ static int32_t doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBloc
return code; return code;
} }
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, static int32_t doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
int32_t rows) { int32_t rows) {
if (pTableScanInfo->pseudoSup.numOfExprs > 0) { int32_t code = 0;
SExprSupp* pSup = &pTableScanInfo->pseudoSup; SExprSupp* pSup = &pTableScanInfo->pseudoSup;
if (pSup->numOfExprs > 0) {
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows, code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
pTaskInfo, &pTableScanInfo->metaCache); pTaskInfo, &pTableScanInfo->metaCache);
// ignore the table not exists error, since this table may have been dropped during the scan procedure. // ignore the table not exists error, since this table may have been dropped during the scan procedure.
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) { if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
T_LONG_JMP(pTaskInfo->env, code); code = 0;
} }
// reset the error code.
terrno = 0;
} }
return code;
} }
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
@ -373,10 +372,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64, qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64,
GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows,
pBlockInfo->id.uid); pBlockInfo->id.uid);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); code = doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
pCost->skipBlocks += 1; pCost->skipBlocks += 1;
pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader); pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
return TSDB_CODE_SUCCESS; return code;
} else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) { } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
pCost->loadBlockStatis += 1; pCost->loadBlockStatis += 1;
loadSMA = true; // mark the operation of load sma; loadSMA = true; // mark the operation of load sma;
@ -391,9 +390,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo), qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); code = doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader); pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
return TSDB_CODE_SUCCESS; return code;
} else { } else {
qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo)); qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
*status = FUNC_DATA_REQUIRED_DATA_LOAD; *status = FUNC_DATA_REQUIRED_DATA_LOAD;
@ -473,7 +472,11 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
qError("[loadDataBlock] p != pBlock"); qError("[loadDataBlock] p != pBlock");
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
} }
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
code = doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
if (code) {
return code;
}
// restore the previous value // restore the previous value
pCost->totalRows -= pBlock->info.rows; pCost->totalRows -= pBlock->info.rows;
@ -912,7 +915,8 @@ static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBas
} }
// set tag/tbname // set tag/tbname
doSetTagColumnData(pBase, pBlock, pTaskInfo, 1); terrno = doSetTagColumnData(pBase, pBlock, pTaskInfo, 1);
return pBlock; return pBlock;
} }
@ -1633,7 +1637,9 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, &p, NULL); code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, &p, NULL);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); code = doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
QUERY_CHECK_CODE(code, lino, _end);
pBlock->info.id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); pBlock->info.id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
} }
@ -2762,12 +2768,16 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) { if (pInfo->numOfPseudoExpr > 0) {
int32_t tmpCode = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr,
pInfo->pRes, pBlockInfo->rows, pTaskInfo, &pTableScanInfo->base.metaCache); pInfo->pRes, pBlockInfo->rows, pTaskInfo, &pTableScanInfo->base.metaCache);
// ignore the table not exists error, since this table may have been dropped during the scan procedure. // ignore the table not exists error, since this table may have been dropped during the scan procedure.
if (tmpCode != TSDB_CODE_SUCCESS && tmpCode != TSDB_CODE_PAR_TABLE_NOT_EXIST) { if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
code = 0;
}
if (code) {
blockDataFreeRes((SSDataBlock*)pBlock); blockDataFreeRes((SSDataBlock*)pBlock);
T_LONG_JMP(pTaskInfo->env, code); QUERY_CHECK_CODE(code, lino, _end);
} }
// reset the error code. // reset the error code.

View File

@ -2573,13 +2573,14 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
for (int32_t i = 0; i < 2; ++i) { for (int32_t i = 0; i < 2; ++i) {
uint8_t paraType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, i))->type; uint8_t paraType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, i))->type;
if (!IS_STR_DATA_TYPE(paraType) && !IS_INTEGER_TYPE(paraType) && !IS_TIMESTAMP_TYPE(paraType)) { if (!IS_STR_DATA_TYPE(paraType) && !IS_INTEGER_TYPE(paraType) && !IS_TIMESTAMP_TYPE(paraType) && !IS_NULL_TYPE(paraType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
} }
uint8_t para2Type;
if (3 == numOfParams) { if (3 == numOfParams) {
if (!IS_INTEGER_TYPE(getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 2))->type)) { para2Type = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 2))->type;
if (!IS_INTEGER_TYPE(para2Type) && !IS_NULL_TYPE(para2Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
} }
@ -2587,7 +2588,7 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
// add database precision as param // add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision; uint8_t dbPrec = pFunc->node.resType.precision;
if (3 == numOfParams) { if (3 == numOfParams && !IS_NULL_TYPE(para2Type)) {
int32_t code = validateTimeUnitParam(dbPrec, (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2)); int32_t code = validateTimeUnitParam(dbPrec, (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2));
if (code == TSDB_CODE_FUNC_TIME_UNIT_TOO_SMALL) { if (code == TSDB_CODE_FUNC_TIME_UNIT_TOO_SMALL) {
return buildFuncErrMsg(pErrBuf, len, code, return buildFuncErrMsg(pErrBuf, len, code,

View File

@ -2668,8 +2668,8 @@ int32_t weekdayFunctionImpl(SScalarParam *pInput, int32_t inputNum, SScalarParam
} }
struct STm tm; struct STm tm;
TAOS_CHECK_RETURN(taosTs2Tm(timeVal, timePrec, &tm)); TAOS_CHECK_RETURN(taosTs2Tm(timeVal, timePrec, &tm));
int32_t ret = startFromZero ? (tm.tm.tm_wday + 6) % 7 : tm.tm.tm_wday + 1; int64_t ret = startFromZero ? (tm.tm.tm_wday + 6) % 7 : tm.tm.tm_wday + 1;
SCL_ERR_RET(colDataSetVal(pOutput->columnData, i, (const char*)&ret, false)); colDataSetInt64(pOutput->columnData, i, &ret);
} }
pOutput->numOfRows = pInput->numOfRows; pOutput->numOfRows = pInput->numOfRows;
@ -2778,8 +2778,8 @@ int32_t weekFunctionImpl(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
} }
struct STm tm; struct STm tm;
SCL_ERR_RET(taosTs2Tm(timeVal, prec, &tm)); SCL_ERR_RET(taosTs2Tm(timeVal, prec, &tm));
int32_t ret = calculateWeekNum(tm.tm, weekMode(mode)); int64_t ret = calculateWeekNum(tm.tm, weekMode(mode));
SCL_ERR_RET(colDataSetVal(pOutput->columnData, i, (const char*)&ret, false)); colDataSetInt64(pOutput->columnData, i, &ret);
} }
pOutput->numOfRows = pInput->numOfRows; pOutput->numOfRows = pInput->numOfRows;

View File

@ -1354,7 +1354,7 @@ int32_t deleteCheckpointFile(const char* id, const char* name) {
return code; return code;
} }
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);

View File

@ -200,7 +200,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
if ((*pTask)->status.requireConsensusChkptId) { if ((*pTask)->status.requireConsensusChkptId) {
entry.checkpointInfo.consensusChkptId = 1; entry.checkpointInfo.consensusChkptId = 1;
(*pTask)->status.requireConsensusChkptId = false; (*pTask)->status.requireConsensusChkptId = false;
stDebug("s-task:%s vgId:%d set the require consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId); stDebug("s-task:%s vgId:%d set requiring consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId);
} }
if ((*pTask)->exec.pWalReader != NULL) { if ((*pTask)->exec.pWalReader != NULL) {

View File

@ -37,12 +37,6 @@ typedef struct {
SHashObj* pTable; SHashObj* pTable;
} SMetaRefMgt; } SMetaRefMgt;
typedef struct STaskInitTs {
int64_t start;
int64_t end;
bool success;
} STaskInitTs;
SMetaRefMgt gMetaRefMgt; SMetaRefMgt gMetaRefMgt;
int32_t metaRefMgtInit(); int32_t metaRefMgtInit();
@ -405,15 +399,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
goto _err; goto _err;
} }
pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK); code = streamMetaInitStartInfo(&pMeta->startInfo);
if (pMeta->startInfo.pReadyTaskSet == NULL) { if (code) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pMeta->startInfo.pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
if (pMeta->startInfo.pFailedTaskSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
@ -609,8 +596,8 @@ void streamMetaCloseImpl(void* arg) {
taosHashCleanup(pMeta->pTasksMap); taosHashCleanup(pMeta->pTasksMap);
taosHashCleanup(pMeta->pTaskDbUnique); taosHashCleanup(pMeta->pTaskDbUnique);
taosHashCleanup(pMeta->updateInfo.pTasks); taosHashCleanup(pMeta->updateInfo.pTasks);
taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
taosHashCleanup(pMeta->startInfo.pFailedTaskSet); streamMetaClearStartInfo(&pMeta->startInfo);
destroyMetaHbInfo(pMeta->pHbInfo); destroyMetaHbInfo(pMeta->pHbInfo);
pMeta->pHbInfo = NULL; pMeta->pHbInfo = NULL;
@ -1051,7 +1038,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
} else { } else {
// todo this should replace the existed object put by replay creating stream task msg from mnode // todo this should replace the existed object put by replay creating stream task msg from mnode
stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId); stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
taosMemoryFree(pTask); tFreeStreamTask(pTask);
continue; continue;
} }
@ -1191,18 +1178,6 @@ void streamMetaStartHb(SStreamMeta* pMeta) {
streamMetaHbToMnode(pRid, NULL); streamMetaHbToMnode(pRid, NULL);
} }
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) {
taosHashClear(pStartInfo->pReadyTaskSet);
taosHashClear(pStartInfo->pFailedTaskSet);
pStartInfo->tasksWillRestart = 0;
pStartInfo->readyTs = 0;
pStartInfo->elapsedTime = 0;
// reset the sentinel flag value to be 0
pStartInfo->startAllTasks = 0;
stDebug("vgId:%d clear start-all-task info", vgId);
}
void streamMetaRLock(SStreamMeta* pMeta) { void streamMetaRLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-rlock", pMeta->vgId); // stTrace("vgId:%d meta-rlock", pMeta->vgId);
(void)taosThreadRwlockRdlock(&pMeta->lock); (void)taosThreadRwlockRdlock(&pMeta->lock);
@ -1302,185 +1277,6 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
} }
} }
static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now) {
streamMetaWLock(pMeta);
if (pMeta->closeFlag) {
streamMetaWUnLock(pMeta);
stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId);
return TSDB_CODE_FAILED;
}
*pList = taosArrayDup(pMeta->pTaskList, NULL);
if (*pList == NULL) {
return terrno;
}
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = now;
int32_t code = streamMetaResetTaskStatus(pMeta);
streamMetaWUnLock(pMeta);
return code;
}
// restore the checkpoint id by negotiating the latest consensus checkpoint id
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pMeta->vgId;
int64_t now = taosGetTimestampMs();
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now);
if (numOfTasks == 0) {
stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId);
return TSDB_CODE_SUCCESS;
}
SArray* pTaskList = NULL;
code = prepareBeforeStartTasks(pMeta, &pTaskList, now);
if (code != TSDB_CODE_SUCCESS) {
ASSERT(pTaskList == NULL);
return TSDB_CODE_SUCCESS;
}
// broadcast the check downstream tasks msg only for tasks with related fill-history tasks.
numOfTasks = taosArrayGetSize(pTaskList);
// prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
// initialization, when the operation of check downstream tasks status is executed far quickly.
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
(void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
continue;
}
if ((pTask->pBackend == NULL) && ((pTask->info.fillHistory == 1) || HAS_RELATED_FILLHISTORY_TASK(pTask))) {
code = pMeta->expandTaskFn(pTask);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
}
}
streamMetaReleaseTask(pMeta, pTask);
}
// Tasks, with related fill-history task or without any checkpoint yet, can be started directly here.
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
(void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
continue;
}
STaskExecStatisInfo* pInfo = &pTask->execInfo;
// fill-history task can only be launched by related stream tasks.
if (pTask->info.fillHistory == 1) {
stDebug("s-task:%s fill-history task wait related stream task start", pTask->id.idStr);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
// ready now, start the related fill-history task
if (pTask->status.downstreamReady == 1) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
pTask->id.idStr);
(void)streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task?
}
(void)streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs,
true);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
if (ret != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
code = ret;
if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) {
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
}
}
streamMetaReleaseTask(pMeta, pTask);
continue;
}
// negotiate the consensus checkpoint id for current task
code = streamTaskSendRestoreChkptMsg(pTask);
// this task may has no checkpoint, but others tasks may generate checkpoint already?
streamMetaReleaseTask(pMeta, pTask);
}
// prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
// initialization, when the operation of check downstream tasks status is executed far quickly.
stInfo("vgId:%d start all task(s) completed", pMeta->vgId);
taosArrayDestroy(pTaskList);
return code;
}
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
streamMetaRLock(pMeta);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
if (num == 0) {
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num);
streamMetaRUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}
int64_t st = taosGetTimestampMs();
// send hb msg to mnode before closing all tasks.
SArray* pTaskList = NULL;
int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (code != TSDB_CODE_SUCCESS) {
continue;
}
(void)streamTaskStop(pTask);
streamMetaReleaseTask(pMeta, pTask);
}
taosArrayDestroy(pTaskList);
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el);
streamMetaRUnLock(pMeta);
return 0;
}
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
int32_t num = taosArrayGetSize(pMeta->pTaskList); int32_t num = taosArrayGetSize(pMeta->pTaskList);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
@ -1499,196 +1295,6 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
return true; return true;
} }
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t code = 0;
int32_t vgId = pMeta->vgId;
SStreamTask* pTask = NULL;
bool continueExec = true;
stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId);
(void)streamMetaAddFailedTask(pMeta, streamId, taskId);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
// fill-history task can only be launched by related stream tasks.
STaskExecStatisInfo* pInfo = &pTask->execInfo;
if (pTask->info.fillHistory == 1) {
stError("s-task:0x%x vgId:%d fill-histroy task, not start here", taskId, vgId);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
// the start all tasks procedure may happen to start the newly deployed stream task, and results in the
// concurrently start this task by two threads.
streamMutexLock(&pTask->lock);
SStreamTaskState status = streamTaskGetStatus(pTask);
if (status.state != TASK_STATUS__UNINIT) {
stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name);
continueExec = false;
} else {
continueExec = true;
}
streamMutexUnlock(&pTask->lock);
if (!continueExec) {
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
ASSERT(pTask->status.downstreamReady == 0);
// avoid initialization and destroy running concurrently.
streamMutexLock(&pTask->lock);
if (pTask->pBackend == NULL) {
code = pMeta->expandTaskFn(pTask);
streamMutexUnlock(&pTask->lock);
if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
}
} else {
streamMutexUnlock(&pTask->lock);
}
// concurrently start task may cause the later started task be failed, and also failed to added into meta result.
if (code == TSDB_CODE_SUCCESS) {
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT,
tstrerror(code));
// do no added into result hashmap if it is failed due to concurrently starting of this stream task.
if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) {
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
}
}
}
streamMetaReleaseTask(pMeta, pTask);
return code;
}
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
int32_t vgId = pMeta->vgId;
void* pIter = NULL;
size_t keyLen = 0;
stInfo("vgId:%d %d tasks check-downstream completed, %s", vgId, taosHashGetSize(pTaskSet),
succ ? "success" : "failed");
while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
STaskInitTs* pInfo = pIter;
void* key = taosHashGetKey(pIter, &keyLen);
SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId));
if (pTask1 == NULL) {
stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
} else {
stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr,
(*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
}
}
}
// check all existed tasks are received rsp
static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal) {
for (int32_t i = 0; i < numOfTotal; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
if (pTaskId == NULL) {
continue;
}
STaskId idx = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
void* px = taosHashGet(pStartInfo->pReadyTaskSet, &idx, sizeof(idx));
if (px == NULL) {
px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx));
if (px == NULL) {
return false;
}
}
}
return true;
}
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
STaskId id = {.streamId = streamId, .taskId = taskId};
int32_t vgId = pMeta->vgId;
bool allRsp = true;
streamMetaWLock(pMeta);
SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p == NULL) { // task does not exists in current vnode, not record the complete info
stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId);
streamMetaWUnLock(pMeta);
return 0;
}
// clear the send consensus-checkpointId flag
streamMutexLock(&(*p)->lock);
(*p)->status.sendConsensusChkptId = false;
streamMutexUnlock(&(*p)->lock);
if (pStartInfo->startAllTasks != 1) {
int64_t el = endTs - startTs;
stDebug(
"vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed "
"time:%" PRId64 "ms",
vgId, taskId, ready, el);
streamMetaWUnLock(pMeta);
return 0;
}
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
if (code) {
if (code == TSDB_CODE_DUP_KEY) {
stError("vgId:%d record start task result failed, s-task:0x%" PRIx64
" already exist start results in meta start task result hashmap",
vgId, id.taskId);
} else {
stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed", vgId, id.taskId);
}
streamMetaWUnLock(pMeta);
return code;
}
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet);
allRsp = allCheckDownstreamRsp(pMeta, pStartInfo, numOfTotal);
if (allRsp) {
pStartInfo->readyTs = taosGetTimestampMs();
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
", readyTs:%" PRId64 " total elapsed time:%.2fs",
vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
pStartInfo->elapsedTime / 1000.0);
// print the initialization elapsed time and info
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
streamMetaResetStartInfo(pStartInfo, vgId);
streamMetaWUnLock(pMeta);
code = pStartInfo->completeFn(pMeta);
} else {
streamMetaWUnLock(pMeta);
stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready,
numOfRecv, numOfTotal);
}
return code;
}
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) { int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);

View File

@ -0,0 +1,444 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "streamBackendRocksdb.h"
#include "streamInt.h"
#include "tmisce.h"
#include "tref.h"
#include "tsched.h"
#include "tstream.h"
#include "ttimer.h"
#include "wal.h"
typedef struct STaskInitTs {
int64_t start;
int64_t end;
bool success;
} STaskInitTs;
static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now);
static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal);
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ);
// restore the checkpoint id by negotiating the latest consensus checkpoint id
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pMeta->vgId;
int64_t now = taosGetTimestampMs();
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now);
if (numOfTasks == 0) {
stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId);
return TSDB_CODE_SUCCESS;
}
SArray* pTaskList = NULL;
code = prepareBeforeStartTasks(pMeta, &pTaskList, now);
if (code != TSDB_CODE_SUCCESS) {
ASSERT(pTaskList == NULL);
return TSDB_CODE_SUCCESS;
}
// broadcast the check downstream tasks msg only for tasks with related fill-history tasks.
numOfTasks = taosArrayGetSize(pTaskList);
// prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
// initialization, when the operation of check downstream tasks status is executed far quickly.
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
(void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
continue;
}
if ((pTask->pBackend == NULL) && ((pTask->info.fillHistory == 1) || HAS_RELATED_FILLHISTORY_TASK(pTask))) {
code = pMeta->expandTaskFn(pTask);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
}
}
streamMetaReleaseTask(pMeta, pTask);
}
// Tasks, with related fill-history task or without any checkpoint yet, can be started directly here.
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
(void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
continue;
}
STaskExecStatisInfo* pInfo = &pTask->execInfo;
// fill-history task can only be launched by related stream tasks.
if (pTask->info.fillHistory == 1) {
stDebug("s-task:%s fill-history task wait related stream task start", pTask->id.idStr);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
// ready now, start the related fill-history task
if (pTask->status.downstreamReady == 1) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
pTask->id.idStr);
(void)streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task?
}
(void)streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs,
true);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
if (ret != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
code = ret;
if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) {
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
}
}
streamMetaReleaseTask(pMeta, pTask);
continue;
}
// negotiate the consensus checkpoint id for current task
code = streamTaskSendNegotiateChkptIdMsg(pTask);
// this task may has no checkpoint, but others tasks may generate checkpoint already?
streamMetaReleaseTask(pMeta, pTask);
}
// prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
// initialization, when the operation of check downstream tasks status is executed far quickly.
stInfo("vgId:%d start all task(s) completed", pMeta->vgId);
taosArrayDestroy(pTaskList);
return code;
}
int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now) {
streamMetaWLock(pMeta);
if (pMeta->closeFlag) {
streamMetaWUnLock(pMeta);
stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId);
return TSDB_CODE_FAILED;
}
*pList = taosArrayDup(pMeta->pTaskList, NULL);
if (*pList == NULL) {
return terrno;
}
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = now;
int32_t code = streamMetaResetTaskStatus(pMeta);
streamMetaWUnLock(pMeta);
return code;
}
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) {
taosHashClear(pStartInfo->pReadyTaskSet);
taosHashClear(pStartInfo->pFailedTaskSet);
pStartInfo->tasksWillRestart = 0;
pStartInfo->readyTs = 0;
pStartInfo->elapsedTime = 0;
// reset the sentinel flag value to be 0
pStartInfo->startAllTasks = 0;
stDebug("vgId:%d clear start-all-task info", vgId);
}
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
STaskId id = {.streamId = streamId, .taskId = taskId};
int32_t vgId = pMeta->vgId;
bool allRsp = true;
streamMetaWLock(pMeta);
SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p == NULL) { // task does not exists in current vnode, not record the complete info
stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId);
streamMetaWUnLock(pMeta);
return 0;
}
// clear the send consensus-checkpointId flag
streamMutexLock(&(*p)->lock);
(*p)->status.sendConsensusChkptId = false;
streamMutexUnlock(&(*p)->lock);
if (pStartInfo->startAllTasks != 1) {
int64_t el = endTs - startTs;
stDebug(
"vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed "
"time:%" PRId64 "ms",
vgId, taskId, ready, el);
streamMetaWUnLock(pMeta);
return 0;
}
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
if (code) {
if (code == TSDB_CODE_DUP_KEY) {
stError("vgId:%d record start task result failed, s-task:0x%" PRIx64
" already exist start results in meta start task result hashmap",
vgId, id.taskId);
} else {
stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed", vgId, id.taskId);
}
streamMetaWUnLock(pMeta);
return code;
}
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet);
allRsp = allCheckDownstreamRsp(pMeta, pStartInfo, numOfTotal);
if (allRsp) {
pStartInfo->readyTs = taosGetTimestampMs();
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
", readyTs:%" PRId64 " total elapsed time:%.2fs",
vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
pStartInfo->elapsedTime / 1000.0);
// print the initialization elapsed time and info
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
streamMetaResetStartInfo(pStartInfo, vgId);
streamMetaWUnLock(pMeta);
code = pStartInfo->completeFn(pMeta);
} else {
streamMetaWUnLock(pMeta);
stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready,
numOfRecv, numOfTotal);
}
return code;
}
// check all existed tasks are received rsp
bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal) {
for (int32_t i = 0; i < numOfTotal; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
if (pTaskId == NULL) {
continue;
}
STaskId idx = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
void* px = taosHashGet(pStartInfo->pReadyTaskSet, &idx, sizeof(idx));
if (px == NULL) {
px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx));
if (px == NULL) {
return false;
}
}
}
return true;
}
void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
int32_t vgId = pMeta->vgId;
void* pIter = NULL;
size_t keyLen = 0;
stInfo("vgId:%d %d tasks check-downstream completed, %s", vgId, taosHashGetSize(pTaskSet),
succ ? "success" : "failed");
while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
STaskInitTs* pInfo = pIter;
void* key = taosHashGetKey(pIter, &keyLen);
SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId));
if (pTask1 == NULL) {
stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
} else {
stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr,
(*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
}
}
}
int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo) {
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
pStartInfo->pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
if (pStartInfo->pReadyTaskSet == NULL) {
return terrno;
}
pStartInfo->pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
if (pStartInfo->pFailedTaskSet == NULL) {
return terrno;
}
return 0;
}
void streamMetaClearStartInfo(STaskStartInfo* pStartInfo) {
taosHashCleanup(pStartInfo->pReadyTaskSet);
taosHashCleanup(pStartInfo->pFailedTaskSet);
pStartInfo->readyTs = 0;
pStartInfo->elapsedTime = 0;
pStartInfo->startTs = 0;
pStartInfo->startAllTasks = 0;
pStartInfo->tasksWillRestart = 0;
pStartInfo->restartCount = 0;
}
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t code = 0;
int32_t vgId = pMeta->vgId;
SStreamTask* pTask = NULL;
bool continueExec = true;
stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId);
(void)streamMetaAddFailedTask(pMeta, streamId, taskId);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
// fill-history task can only be launched by related stream tasks.
STaskExecStatisInfo* pInfo = &pTask->execInfo;
if (pTask->info.fillHistory == 1) {
stError("s-task:0x%x vgId:%d fill-histroy task, not start here", taskId, vgId);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
// the start all tasks procedure may happen to start the newly deployed stream task, and results in the
// concurrently start this task by two threads.
streamMutexLock(&pTask->lock);
SStreamTaskState status = streamTaskGetStatus(pTask);
if (status.state != TASK_STATUS__UNINIT) {
stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name);
continueExec = false;
} else {
continueExec = true;
}
streamMutexUnlock(&pTask->lock);
if (!continueExec) {
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
ASSERT(pTask->status.downstreamReady == 0);
// avoid initialization and destroy running concurrently.
streamMutexLock(&pTask->lock);
if (pTask->pBackend == NULL) {
code = pMeta->expandTaskFn(pTask);
streamMutexUnlock(&pTask->lock);
if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
}
} else {
streamMutexUnlock(&pTask->lock);
}
// concurrently start task may cause the later started task be failed, and also failed to added into meta result.
if (code == TSDB_CODE_SUCCESS) {
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT,
tstrerror(code));
// do no added into result hashmap if it is failed due to concurrently starting of this stream task.
if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) {
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
}
}
}
streamMetaReleaseTask(pMeta, pTask);
return code;
}
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
streamMetaRLock(pMeta);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
if (num == 0) {
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num);
streamMetaRUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}
int64_t st = taosGetTimestampMs();
// send hb msg to mnode before closing all tasks.
SArray* pTaskList = NULL;
int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (code != TSDB_CODE_SUCCESS) {
continue;
}
(void)streamTaskStop(pTask);
streamMetaReleaseTask(pMeta, pTask);
}
taosArrayDestroy(pTaskList);
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el);
streamMetaRUnLock(pMeta);
return 0;
}

View File

@ -14,6 +14,8 @@
*/ */
#include "executor.h" #include "executor.h"
#include "osDir.h"
#include "osMemory.h"
#include "streamInt.h" #include "streamInt.h"
#include "streamsm.h" #include "streamsm.h"
#include "tmisce.h" #include "tmisce.h"
@ -30,7 +32,7 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray); int32_t childId = taosArrayGetSize(pArray);
pTask->info.selfChildId = childId; pTask->info.selfChildId = childId;
void* p = taosArrayPush(pArray, &pTask); void* p = taosArrayPush(pArray, &pTask);
return (p == NULL)? TSDB_CODE_OUT_OF_MEMORY:TSDB_CODE_SUCCESS; return (p == NULL) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS;
} }
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) { static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
@ -42,7 +44,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp
if (!isEqual) { if (!isEqual) {
(*pUpdated) = true; (*pUpdated) = true;
char tmp[512] = {0}; char tmp[512] = {0};
(void) epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp)); // only for log file, ignore errors (void)epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp)); // only for log file, ignore errors
epsetAssign(&pTask->info.epSet, pEpSet); epsetAssign(&pTask->info.epSet, pEpSet);
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp); stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp);
@ -92,7 +94,7 @@ static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
} }
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** p) { SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** p) {
*p = NULL; *p = NULL;
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
@ -224,17 +226,17 @@ void tFreeStreamTask(SStreamTask* pTask) {
} }
if (pTask->schedInfo.pDelayTimer != NULL) { if (pTask->schedInfo.pDelayTimer != NULL) {
(void) taosTmrStop(pTask->schedInfo.pDelayTimer); (void)taosTmrStop(pTask->schedInfo.pDelayTimer);
pTask->schedInfo.pDelayTimer = NULL; pTask->schedInfo.pDelayTimer = NULL;
} }
if (pTask->hTaskInfo.pTimer != NULL) { if (pTask->hTaskInfo.pTimer != NULL) {
(void) taosTmrStop(pTask->hTaskInfo.pTimer); (void)taosTmrStop(pTask->hTaskInfo.pTimer);
pTask->hTaskInfo.pTimer = NULL; pTask->hTaskInfo.pTimer = NULL;
} }
if (pTask->msgInfo.pRetryTmr != NULL) { if (pTask->msgInfo.pRetryTmr != NULL) {
(void) taosTmrStop(pTask->msgInfo.pRetryTmr); (void)taosTmrStop(pTask->msgInfo.pRetryTmr);
pTask->msgInfo.pRetryTmr = NULL; pTask->msgInfo.pRetryTmr = NULL;
} }
@ -296,15 +298,6 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList); taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
pTask->outputInfo.pNodeEpsetUpdateList = NULL; pTask->outputInfo.pNodeEpsetUpdateList = NULL;
// if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) {
// char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128);
// sprintf(path, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, pTask->id.idStr);
// taosRemoveDir(path);
// stInfo("s-task:0x%x vgId:%d remove all backend files:%s", taskId, pTask->pMeta->vgId, path);
// taosMemoryFree(path);
// }
if (pTask->id.idStr != NULL) { if (pTask->id.idStr != NULL) {
taosMemoryFree((void*)pTask->id.idStr); taosMemoryFree((void*)pTask->id.idStr);
} }
@ -321,10 +314,21 @@ void streamFreeTaskState(SStreamTask* pTask, int8_t remove) {
stDebug("s-task:0x%x start to free task state", pTask->id.taskId); stDebug("s-task:0x%x start to free task state", pTask->id.taskId);
streamStateClose(pTask->pState, remove); streamStateClose(pTask->pState, remove);
if (remove)taskDbSetClearFileFlag(pTask->pBackend); if (remove) taskDbSetClearFileFlag(pTask->pBackend);
taskDbRemoveRef(pTask->pBackend); taskDbRemoveRef(pTask->pBackend);
pTask->pBackend = NULL; pTask->pBackend = NULL;
pTask->pState = NULL; pTask->pState = NULL;
} else {
if (remove) {
if (pTask->backendPath != NULL) {
taosRemoveDir(pTask->backendPath);
}
}
}
if (pTask->backendPath != NULL) {
taosMemoryFree(pTask->backendPath);
pTask->backendPath = NULL;
} }
} }
@ -364,8 +368,36 @@ static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
} }
} }
int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
int64_t streamId = 0;
int32_t taskId = 0;
if (pTask->info.fillHistory) {
streamId = pTask->hTaskInfo.id.taskId;
taskId = pTask->hTaskInfo.id.taskId;
} else {
streamId = pTask->streamTaskId.taskId;
taskId = pTask->streamTaskId.taskId;
}
char id[128] = {0};
int32_t nBytes = sprintf(id, "0x%" PRIx64 "-0x%x", streamId, taskId);
if (nBytes < 0 || nBytes >= sizeof(id)) {
return TSDB_CODE_OUT_OF_BUFFER;
}
int32_t len = strlen(pTask->pMeta->path);
pTask->backendPath = (char*)taosMemoryMalloc(len + nBytes + 2);
if (pTask->backendPath == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(void)sprintf(pTask->backendPath, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
return 0;
}
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) { int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
(void) createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr); (void)createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
pTask->refCnt = 1; pTask->refCnt = 1;
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
@ -459,10 +491,14 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
} }
if (pTask->chkInfo.pActiveInfo == NULL) { if (pTask->chkInfo.pActiveInfo == NULL) {
code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
if (code) {
stError("s-task:%s failed to create active checkpoint info, code:%s", pTask->id.idStr, tstrerror(code));
return code;
}
} }
return code; return streamTaskSetBackendPath(pTask);
} }
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) { int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
@ -494,12 +530,12 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre
} }
void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo); void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo);
return (p == NULL)? TSDB_CODE_OUT_OF_MEMORY:TSDB_CODE_SUCCESS; return (p == NULL) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS;
} }
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) { void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
char buf[512] = {0}; char buf[512] = {0};
(void) epsetToStr(pEpSet, buf, tListLen(buf)); // ignore error since it is only for log file. (void)epsetToStr(pEpSet, buf, tListLen(buf)); // ignore error since it is only for log file.
int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
for (int32_t i = 0; i < numOfUpstream; ++i) { for (int32_t i = 0; i < numOfUpstream; ++i) {
@ -510,7 +546,7 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
*pUpdated = true; *pUpdated = true;
char tmp[512] = {0}; char tmp[512] = {0};
(void) epsetToStr(&pInfo->epSet, tmp, tListLen(tmp)); (void)epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
epsetAssign(&pInfo->epSet, pEpSet); epsetAssign(&pInfo->epSet, pEpSet);
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId, stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId,
@ -545,7 +581,7 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) { void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
char buf[512] = {0}; char buf[512] = {0};
(void) epsetToStr(pEpSet, buf, tListLen(buf)); // ignore the error since only for log files. (void)epsetToStr(pEpSet, buf, tListLen(buf)); // ignore the error since only for log files.
int32_t id = pTask->id.taskId; int32_t id = pTask->id.taskId;
int8_t type = pTask->outputInfo.type; int8_t type = pTask->outputInfo.type;
@ -564,7 +600,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
if (!isEqual) { if (!isEqual) {
*pUpdated = true; *pUpdated = true;
char tmp[512] = {0}; char tmp[512] = {0};
(void) epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp)); (void)epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
epsetAssign(&pVgInfo->epSet, pEpSet); epsetAssign(&pVgInfo->epSet, pEpSet);
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId, stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId,
@ -584,7 +620,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
*pUpdated = true; *pUpdated = true;
char tmp[512] = {0}; char tmp[512] = {0};
(void) epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp)); (void)epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
epsetAssign(&pDispatcher->epSet, pEpSet); epsetAssign(&pDispatcher->epSet, pEpSet);
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId, stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId,
@ -919,7 +955,7 @@ STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
int32_t code = 0; int32_t code = 0;
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num); stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num);
@ -935,7 +971,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
} }
void streamTaskPause(SStreamTask* pTask) { void streamTaskPause(SStreamTask* pTask) {
(void) streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL); (void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
} }
void streamTaskResume(SStreamTask* pTask) { void streamTaskResume(SStreamTask* pTask) {
@ -1142,13 +1178,13 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr; SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr;
if (pTriggerTmr->tmrHandle != NULL) { if (pTriggerTmr->tmrHandle != NULL) {
(void) taosTmrStop(pTriggerTmr->tmrHandle); (void)taosTmrStop(pTriggerTmr->tmrHandle);
pTriggerTmr->tmrHandle = NULL; pTriggerTmr->tmrHandle = NULL;
} }
SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr; SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr;
if (pReadyTmr->tmrHandle != NULL) { if (pReadyTmr->tmrHandle != NULL) {
(void) taosTmrStop(pReadyTmr->tmrHandle); (void)taosTmrStop(pReadyTmr->tmrHandle);
pReadyTmr->tmrHandle = NULL; pReadyTmr->tmrHandle = NULL;
} }
@ -1182,7 +1218,9 @@ const char* streamTaskGetExecType(int32_t type) {
return "resume-task-from-idle"; return "resume-task-from-idle";
case STREAM_EXEC_T_ADD_FAILED_TASK: case STREAM_EXEC_T_ADD_FAILED_TASK:
return "record-start-failed-task"; return "record-start-failed-task";
case 0:
return "exec-all-tasks";
default: default:
return "invalid-exec-type"; return "invalid-exec-type";
} }
} }

View File

@ -26,6 +26,7 @@ typedef struct {
uint32_t seq; uint32_t seq;
int32_t refSetId; int32_t refSetId;
TdThread thread; TdThread thread;
stopDnodeFn stopDnode;
} SWalMgmt; } SWalMgmt;
static SWalMgmt tsWal = {0, .seq = 1}; static SWalMgmt tsWal = {0, .seq = 1};
@ -35,7 +36,7 @@ static void walFreeObj(void *pWal);
int64_t walGetSeq() { return (int64_t)atomic_load_32((volatile int32_t *)&tsWal.seq); } int64_t walGetSeq() { return (int64_t)atomic_load_32((volatile int32_t *)&tsWal.seq); }
int32_t walInit() { int32_t walInit(stopDnodeFn stopDnode) {
int8_t old; int8_t old;
while (1) { while (1) {
old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 2); old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 2);
@ -57,6 +58,11 @@ int32_t walInit() {
atomic_store_8(&tsWal.inited, 1); atomic_store_8(&tsWal.inited, 1);
} }
if (stopDnode == NULL) {
wWarn("failed to set stop dnode call back");
}
tsWal.stopDnode = stopDnode;
return 0; return 0;
} }
@ -164,6 +170,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
goto _err; goto _err;
} }
pWal->stopDnode = tsWal.stopDnode;
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
pWal->cfg.fsyncPeriod); pWal->cfg.fsyncPeriod);
return pWal; return pWal;

View File

@ -525,6 +525,11 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
if (size != sizeof(SWalIdxEntry)) { if (size != sizeof(SWalIdxEntry)) {
wError("vgId:%d, failed to write idx entry due to %s. ver:%" PRId64, pWal->cfg.vgId, strerror(errno), ver); wError("vgId:%d, failed to write idx entry due to %s. ver:%" PRId64, pWal->cfg.vgId, strerror(errno), ver);
if (pWal->stopDnode != NULL) {
wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId);
pWal->stopDnode();
}
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
} }
@ -571,6 +576,11 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno)); strerror(errno));
if (pWal->stopDnode != NULL) {
wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId);
pWal->stopDnode();
}
TAOS_CHECK_GOTO(code, &lino, _exit); TAOS_CHECK_GOTO(code, &lino, _exit);
} }
@ -627,6 +637,11 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
taosMemoryFreeClear(newBodyEncrypted); taosMemoryFreeClear(newBodyEncrypted);
} }
if (pWal->stopDnode != NULL) {
wWarn("vgId:%d, set stop dnode flag", pWal->cfg.vgId);
pWal->stopDnode();
}
TAOS_CHECK_GOTO(code, &lino, _exit); TAOS_CHECK_GOTO(code, &lino, _exit);
} }

View File

@ -12,7 +12,7 @@ SWalSyncInfo syncMeta = {0};
class WalCleanEnv : public ::testing::Test { class WalCleanEnv : public ::testing::Test {
protected: protected:
static void SetUpTestCase() { static void SetUpTestCase() {
int code = walInit(); int code = walInit(NULL);
ASSERT(code == 0); ASSERT(code == 0);
} }
@ -44,7 +44,7 @@ class WalCleanEnv : public ::testing::Test {
class WalCleanDeleteEnv : public ::testing::Test { class WalCleanDeleteEnv : public ::testing::Test {
protected: protected:
static void SetUpTestCase() { static void SetUpTestCase() {
int code = walInit(); int code = walInit(NULL);
ASSERT(code == 0); ASSERT(code == 0);
} }
@ -74,7 +74,7 @@ class WalCleanDeleteEnv : public ::testing::Test {
class WalKeepEnv : public ::testing::Test { class WalKeepEnv : public ::testing::Test {
protected: protected:
static void SetUpTestCase() { static void SetUpTestCase() {
int code = walInit(); int code = walInit(NULL);
ASSERT(code == 0); ASSERT(code == 0);
} }
@ -111,7 +111,7 @@ class WalKeepEnv : public ::testing::Test {
class WalRetentionEnv : public ::testing::Test { class WalRetentionEnv : public ::testing::Test {
protected: protected:
static void SetUpTestCase() { static void SetUpTestCase() {
int code = walInit(); int code = walInit(NULL);
ASSERT(code == 0); ASSERT(code == 0);
} }

View File

@ -1909,6 +1909,8 @@ class TDCom:
if latency < self.stream_timeout: if latency < self.stream_timeout:
latency += 1 latency += 1
time.sleep(1) time.sleep(1)
else:
return False
return tbname return tbname
def get_group_id_from_stb(self, stbname): def get_group_id_from_stb(self, stbname):