fix(stream): reduce the consensus checkpoint id trans. (#30105)

* fix(stream): reduce the consensus checkpoint id trans.

* refactor(stream): add some logs.

* refactor(stream): set the max checkpoint exec time 30min.

* refactor(stream): add checkpoint-consensus trans conflict check.

* refactor(stream): remove unused local variables.

* fix(stream): fix syntax error.

* fix(stream): 1. fix free memory error 2. continue if put result into dst hashmap failed.

* fix issue

* fix issue

* fix(mnd): follower mnode not processes the timer event.

* fix(stream): print correct error msg.

* fix(stream): add some logs.

* fix(stream): add some logs.

* fix(stream): add some logs.

* fix(stream): add some logs.

* fix(stream): add some logs.

* fix(stream): add some logs.

* fix(stream): add some logs.

* fix(stream): add some logs.

* fix(stream): truncate long subtable name

* fix(stream): add buffer len.

* refactor(stream): update some logs.

* fix issue

* refactor(stream): update some logs.

* refactor(stream): update some logs.

* fix(stream): check return value.

* fix(stream): fix syntax error.

* fix(stream): check return value.

* fix(stream): update the timer check in mnode.

* fix(stream): add restart stage tracking.

* fix(stream): track the start task stage for meta.

* fix(stream): fix error in log.

* refactor(stream): adjust log info.

* fix mem issue

* fix(stream): check the number of required tasks for consensus checkpointId.

* fix(stream): lock the whole start procedure.

* fix(stream): add lock during start all tasks.

* fix(stream): update logs.

* fix(stream): update logs.

* fix(stream): update logs.

* fix(stream): fix dead-lock.

* fix(stream): fix syntax error.

* fix(stream): not drop the scan-history task.

* fix(stream): fix syntax error.

* fix(stream): wait for executor stop before restarting.

* fix(stream): add some logs.

* fix(stream): add some logs.

* fix(stream): add some logs.

* fix(stream): disable some logs.

* fix(stream): reset the start info if no task left.

---------

Co-authored-by: 54liuyao <54liuyao@163.com>
Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com>
This commit is contained in:
Haojun Liao 2025-03-17 10:20:17 +08:00 committed by GitHub
parent 0806cca09e
commit ab92886820
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
38 changed files with 798 additions and 450 deletions

View File

@ -241,6 +241,7 @@ typedef struct SRestoreCheckpointInfo {
int32_t transId; // transaction id of the update the consensus-checkpointId transaction
int32_t taskId;
int32_t nodeId;
int32_t term;
} SRestoreCheckpointInfo;
int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq);

View File

@ -465,6 +465,17 @@ struct SStreamTask {
typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*);
typedef enum {
START_MARK_REQ_CHKPID = 0x1,
START_WAIT_FOR_CHKPTID = 0x2,
START_CHECK_DOWNSTREAM = 0x3,
} EStartStage;
typedef struct {
EStartStage stage;
int64_t ts;
} SStartTaskStageInfo;
typedef struct STaskStartInfo {
int64_t startTs;
int64_t readyTs;
@ -474,6 +485,8 @@ typedef struct STaskStartInfo {
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
int64_t elapsedTime;
int32_t restartCount; // restart task counter
EStartStage curStage; // task start stage
SArray* pStagesList; // history stage list with timestamp, SArrya<SStartTaskStageInfo>
startComplete_fn_t completeFn; // complete callback function
} STaskStartInfo;
@ -706,7 +719,7 @@ void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id);
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo);
// fill-history task
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask, bool lock);
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration);
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
@ -777,12 +790,14 @@ void streamMetaNotifyClose(SStreamMeta* pMeta);
void streamMetaStartHb(SStreamMeta* pMeta);
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready);
int32_t streamMetaAddTaskLaunchResultNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId,
int64_t startTs, int64_t endTs, bool ready);
int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo);
void streamMetaClearStartInfo(STaskStartInfo* pStartInfo);
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs);
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, bool lock);
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs, bool lock);
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
int64_t startTs);
void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta);

View File

@ -794,6 +794,7 @@ _exit:
return code;
}
// todo: serialized term attributes.
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) {
int32_t code = 0;
int32_t lino;

View File

@ -2896,31 +2896,18 @@ _end:
}
// Construct the child table name in the form of <ctbName>_<stbName>_<groupId> and store it in `ctbName`.
// If the name length exceeds TSDB_TABLE_NAME_LEN, first convert <stbName>_<groupId> to an MD5 value and then
// concatenate. If the length is still too long, convert <ctbName> to an MD5 value as well.
int32_t buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId, size_t cap) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
char tmp[TSDB_TABLE_NAME_LEN] = {0};
char* suffix = tmp;
size_t suffixCap = sizeof(tmp);
size_t suffixLen = 0;
size_t prefixLen = 0;
T_MD5_CTX context;
if (ctbName == NULL || cap < TSDB_TABLE_NAME_LEN) {
code = TSDB_CODE_INTERNAL_ERROR;
TSDB_CHECK_CODE(code, lino, _end);
}
prefixLen = strlen(ctbName);
if (stbName == NULL) {
suffixLen = snprintf(suffix, suffixCap, "%" PRIu64, groupId);
if (suffixLen >= suffixCap) {
code = TSDB_CODE_INTERNAL_ERROR;
TSDB_CHECK_CODE(code, lino, _end);
}
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId);
} else {
int32_t i = strlen(stbName) - 1;
for (; i >= 0; i--) {
@ -2928,52 +2915,12 @@ int32_t buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t grou
break;
}
}
suffixLen = snprintf(suffix, suffixCap, "%s_%" PRIu64, stbName + i + 1, groupId);
if (suffixLen >= suffixCap) {
suffixCap = suffixLen + 1;
suffix = taosMemoryMalloc(suffixCap);
TSDB_CHECK_NULL(suffix, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
suffixLen = snprintf(suffix, suffixCap, "%s_%" PRIu64, stbName + i + 1, groupId);
if (suffixLen >= suffixCap) {
code = TSDB_CODE_INTERNAL_ERROR;
TSDB_CHECK_CODE(code, lino, _end);
}
}
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%" PRIu64, stbName + i + 1, groupId);
}
if (prefixLen + suffixLen + 1 >= TSDB_TABLE_NAME_LEN) {
// If the name length exceeeds the limit, convert the suffix to MD5 value.
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)suffix, suffixLen);
tMD5Final(&context);
suffixLen = snprintf(suffix, suffixCap, "%016" PRIx64 "%016" PRIx64, *(uint64_t*)context.digest,
*(uint64_t*)(context.digest + 8));
if (suffixLen >= suffixCap) {
code = TSDB_CODE_INTERNAL_ERROR;
TSDB_CHECK_CODE(code, lino, _end);
}
}
if (prefixLen + suffixLen + 1 >= TSDB_TABLE_NAME_LEN) {
// If the name is still too long, convert the ctbName to MD5 value.
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)ctbName, prefixLen);
tMD5Final(&context);
prefixLen = snprintf(ctbName, cap, "t_%016" PRIx64 "%016" PRIx64, *(uint64_t*)context.digest,
*(uint64_t*)(context.digest + 8));
if (prefixLen >= cap) {
code = TSDB_CODE_INTERNAL_ERROR;
TSDB_CHECK_CODE(code, lino, _end);
}
}
if (prefixLen + suffixLen + 1 >= TSDB_TABLE_NAME_LEN) {
code = TSDB_CODE_INTERNAL_ERROR;
TSDB_CHECK_CODE(code, lino, _end);
}
ctbName[prefixLen] = '_';
tstrncpy(&ctbName[prefixLen + 1], suffix, cap - prefixLen - 1);
ctbName[cap - strlen(tmp) - 1] = 0; // put stbname + groupId to the end
size_t prefixLen = strlen(ctbName);
ctbName = strncat(ctbName, tmp, cap - prefixLen - 1);
for (char* p = ctbName; *p; ++p) {
if (*p == '.') *p = '_';
@ -2983,9 +2930,6 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (suffix != tmp) {
taosMemoryFree(suffix);
}
return code;
}

View File

@ -531,7 +531,6 @@ TEST(testCase, StreamWithoutDotInStbName2) {
TEST(testCase, StreamWithLongStbName) {
char ctbName[TSDB_TABLE_NAME_LEN];
char expectName[TSDB_TABLE_NAME_LEN];
char *stbName = "a_simle_stb_name";
uint64_t groupId = UINT64_MAX;
@ -550,29 +549,13 @@ TEST(testCase, StreamWithLongStbName) {
EXPECT_EQ(buildCtbNameAddGroupId(stbName, NULL, groupId, sizeof(ctbName)), TSDB_CODE_INTERNAL_ERROR);
EXPECT_EQ(buildCtbNameAddGroupId(stbName, ctbName, groupId, sizeof(ctbName) - 1), TSDB_CODE_INTERNAL_ERROR);
// test md5 conversion of stbName with groupid
// test truncation of long ctbName
for (int32_t i = 0; i < 159; ++i) ctbName[i] = 'A';
ctbName[159] = '\0';
stbName = taosStrdup(ctbName);
snprintf(expectName, TSDB_TABLE_NAME_LEN, "%s_d85f0d87946d76eeedd7b7b78b7492a2", ctbName);
std::string expectName = std::string(ctbName) + "_" + std::string(stbName) + "_" + std::to_string(groupId);
EXPECT_EQ(buildCtbNameAddGroupId(stbName, ctbName, groupId, sizeof(ctbName)), TSDB_CODE_SUCCESS);
EXPECT_STREQ(ctbName, expectName);
// test md5 conversion of all parts
for (int32_t i = 0; i < 190; ++i) ctbName[i] = 'A';
ctbName[190] = '\0';
tstrncpy(expectName, "t_d38a8b2df999bef0082ffc80a59a9cd7_d85f0d87946d76eeedd7b7b78b7492a2", TSDB_TABLE_NAME_LEN);
EXPECT_EQ(buildCtbNameAddGroupId(stbName, ctbName, groupId, sizeof(ctbName)), TSDB_CODE_SUCCESS);
EXPECT_STREQ(ctbName, expectName);
// test larger stbName
taosMemoryFree(stbName);
for (int32_t i = 0; i < 190; ++i) ctbName[i] = 'A';
ctbName[190] = '\0';
stbName = taosStrdup(ctbName);
tstrncpy(expectName, "t_d38a8b2df999bef0082ffc80a59a9cd7_9c99cc7c52073b63fb750af402d9b84b", TSDB_TABLE_NAME_LEN);
EXPECT_EQ(buildCtbNameAddGroupId(stbName, ctbName, groupId, sizeof(ctbName)), TSDB_CODE_SUCCESS);
EXPECT_STREQ(ctbName, expectName);
EXPECT_STREQ(ctbName, expectName.c_str() + expectName.size() - TSDB_TABLE_NAME_LEN + 1);
taosMemoryFree(stbName);
}

View File

@ -122,7 +122,7 @@ int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId);
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList);
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList, SHashObj* pTermMap);
void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo);
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
@ -147,14 +147,13 @@ int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *p
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId);
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId);
int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId, int64_t ts);
int32_t mndStreamSetChkptIdAction(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t checkpointId, SArray *pList);
int32_t mndStreamSetRestartAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
int8_t mndTrigger);
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList);
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq);
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId,
int64_t ts);
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId, SArray* pList);
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo);
int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
SVgroupChangeInfo *pInfo);

View File

@ -452,19 +452,18 @@ static void *mndThreadFp(void *param) {
while (1) {
lastTime++;
taosMsleep(100);
if (mndGetStop(pMnode)) break;
if (lastTime % 10 != 0) continue;
if (mnodeIsNotLeader(pMnode)) {
mTrace("timer not process since mnode is not leader");
continue;
}
int64_t sec = lastTime / 10;
mndDoTimerCheckTask(pMnode, sec);
int64_t minCron = minCronTime();
if (sec % minCron == 0 && mnodeIsNotLeader(pMnode)) {
// not leader, do nothing
mTrace("timer not process since mnode is not leader, reason: %s", tstrerror(terrno));
terrno = 0;
continue;
}
mndDoTimerPullupTask(pMnode, sec);
}

View File

@ -13,13 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndStream.h"
#include "audit.h"
#include "mndDb.h"
#include "mndPrivilege.h"
#include "mndScheduler.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndStream.h"
#include "mndTrans.h"
#include "osMemory.h"
#include "parser.h"
@ -1228,18 +1228,16 @@ static int32_t streamWaitComparFn(const void *p1, const void *p2) {
}
// all tasks of this stream should be ready, otherwise do nothing
static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) {
static bool isStreamReadyHelp(int64_t now, SStreamObj *pStream) {
bool ready = false;
streamMutexLock(&execInfo.lock);
int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) {
if (lastReadyTs != -1) {
mInfo("not start checkpoint, stream:0x%" PRIx64 " last ready ts:%" PRId64 " ready duration:%" PRId64
"ms less than threshold",
pStream->uid, lastReadyTs, (now - lastReadyTs));
mInfo("not start checkpoint, stream:0x%" PRIx64 " readyTs:%" PRId64 " ready duration:%.2fs less than threshold",
pStream->uid, lastReadyTs, (now - lastReadyTs) / 1000.0);
}
ready = false;
@ -1901,11 +1899,12 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes, STrans** pUpdateTrans) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
STrans *pTrans = NULL;
int32_t code = 0;
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes,
STrans **pUpdateTrans, SArray* pStreamList) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
STrans *pTrans = NULL;
int32_t code = 0;
*pUpdateTrans = NULL;
// conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
@ -1974,6 +1973,10 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
}
code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
if (code == 0) {
taosArrayPush(pStreamList, &pStream->uid);
}
sdbRelease(pSdb, pStream);
if (code != TSDB_CODE_SUCCESS) {
@ -2152,7 +2155,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
return 0;
}
code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot, NULL);
if (code) {
mError("failed to take the vgroup snapshot, ignore it and continue");
}
@ -2176,10 +2179,27 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
mDebug("vnode(s) change detected, build trans to update stream task epsets");
STrans *pTrans = NULL;
SArray* pStreamIdList = taosArrayInit(4, sizeof(int64_t));
streamMutexLock(&execInfo.lock);
code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups, &pTrans);
code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups, &pTrans, pStreamIdList);
// remove the consensus-checkpoint-id req of all related stream(s)
int32_t num = taosArrayGetSize(pStreamIdList);
if (num > 0) {
mDebug("start to clear %d related stream in consensus-checkpoint-id list due to nodeUpdate", num);
for (int32_t x = 0; x < num; ++x) {
int64_t uid = *(int64_t *)taosArrayGet(pStreamIdList, x);
int32_t ret = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, uid);
if (ret != 0) {
mError("failed to remove stream:0x%" PRIx64 " from consensus-checkpoint-id list, code:%s", uid,
tstrerror(ret));
}
}
}
streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pStreamIdList);
// NOTE: sync trans out of lock
if (code == 0 && pTrans != NULL) {
@ -2385,8 +2405,9 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
if (pStream != NULL) { // TODO:handle error
code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
if (code) {
mError("failed to create checkpoint trans, code:%s", tstrerror(code));
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:0x%" PRIx64 " failed to create checkpoint trans, checkpointId:%" PRId64 ", code:%s",
req.streamId, checkpointId, tstrerror(code));
}
} else {
// todo: wait for the create stream trans completed, and launch the checkpoint trans
@ -2394,11 +2415,15 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
// sleep(500ms)
}
// remove this entry
(void) taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
// remove this entry, not overwriting the global error code
int32_t ret = taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
if (ret) {
mError("failed to remove transfer state stream, code:%s", tstrerror(ret));
}
int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
mDebug("stream:0x%" PRIx64 " removed in transfer-state list, %d stream(s) not finish fill-history process",
req.streamId, numOfStreams);
}
if (pStream != NULL) {
@ -2475,7 +2500,7 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportedChkptId, const
pReport->taskId, p->checkpointId, pReport->checkpointId);
} else if (p->checkpointId < pReport->checkpointId) { // expired checkpoint-report msg, update it
mInfo("s-task:0x%x expired checkpoint-report info in checkpoint-report list update from %" PRId64 "->%" PRId64,
pReport->taskId, p->checkpointId, pReport->checkpointId);
pReport->taskId, p->checkpointId, pReport->checkpointId);
// update the checkpoint report info
p->checkpointId = pReport->checkpointId;
@ -2612,6 +2637,8 @@ static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t *pEx
if (chkId > pe->checkpointInfo.latestId) {
if (chkId != INT64_MAX) {
*pAllSame = false;
mDebug("checkpointIds not identical, prev:%" PRId64 " smaller:%" PRId64 " from task:0x%" PRIx64, chkId,
pe->checkpointInfo.latestId, pe->id.taskId);
}
chkId = pe->checkpointInfo.latestId;
}
@ -2637,7 +2664,7 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId,
}
}
static int32_t doCleanReqList(SArray* pList, SCheckpointConsensusInfo* pInfo) {
static int32_t doCleanReqList(SArray *pList, SCheckpointConsensusInfo *pInfo) {
int32_t alreadySend = taosArrayGetSize(pList);
for (int32_t i = 0; i < alreadySend; ++i) {
@ -2663,7 +2690,6 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
int64_t now = taosGetTimestampMs();
bool allReady = true;
SArray *pNodeSnapshot = NULL;
int32_t maxAllowedTrans = 20;
int32_t numOfTrans = 0;
int32_t code = 0;
void *pIter = NULL;
@ -2679,9 +2705,16 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
return terrno;
}
SHashObj* pTermMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pTermMap == NULL) {
taosArrayDestroy(pList);
taosArrayDestroy(pStreamList);
return terrno;
}
mDebug("start to process consensus-checkpointId in tmr");
code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot, pTermMap);
taosArrayDestroy(pNodeSnapshot);
if (code) {
mError("failed to get the vgroup snapshot, ignore it and continue");
@ -2691,6 +2724,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
taosArrayDestroy(pStreamList);
taosArrayDestroy(pList);
taosHashCleanup(pTermMap);
return 0;
}
@ -2717,31 +2751,62 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
continue;
}
if (pStream->uid != pInfo->streamId) {
// todo remove it
}
if ((num < pInfo->numOfTasks) || (pInfo->numOfTasks == 0)) {
mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-consensus req(not all), ignore", pStream->uid,
pStream->name, num, pInfo->numOfTasks);
mndReleaseStream(pMnode, pStream);
continue;
}
streamId = pStream->uid;
int32_t existed = 0;
bool allSame = true;
int64_t chkId = getConsensusId(pInfo->streamId, pInfo->numOfTasks, &existed, &allSame);
if (chkId == -1) {
mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again", existed, pInfo->numOfTasks);
mndReleaseStream(pMnode, pStream);
continue;
}
bool allQualified = true;
for (int32_t j = 0; j < num; ++j) {
SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
if (pe == NULL) {
continue;
}
if (streamId == -1) {
streamId = pe->req.streamId;
}
int32_t existed = 0;
bool allSame = true;
int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame);
if (chkId == -1) {
mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed,
pInfo->numOfTasks, pe->req.taskId);
break;
if (pe->req.nodeId != -2) {
int32_t *pTerm = taosHashGet(pTermMap, &(pe->req.nodeId), sizeof(pe->req.nodeId));
if (pTerm == NULL) {
mError("stream:0x%" PRIx64 " s-task:0x%x req from vgId:%d not found in termMap", pe->req.streamId,
pe->req.taskId, pe->req.nodeId);
allQualified = false;
continue;
} else {
if (*pTerm != pe->req.term) {
mWarn("stream:0x%" PRIx64 " s-task:0x%x req from vgId:%d is expired, term:%d, current term:%d",
pe->req.streamId, pe->req.taskId, pe->req.nodeId, pe->req.term, *pTerm);
allQualified = false;
continue;
}
}
}
if (((now - pe->ts) >= 10 * 1000) || allSame) {
mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
pe->req.startTs, (now - pe->ts) / 1000.0);
mDebug("s-task:0x%x vgId:%d term:%d sendTs:%" PRId64 " wait %.2fs or all tasks have same checkpointId:%" PRId64, pe->req.taskId,
pe->req.nodeId, pe->req.term, pe->req.startTs, (now - pe->ts) / 1000.0, chkId);
if (chkId > pe->req.checkpointId) {
streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pStreamList);
taosArrayDestroy(pList);
taosHashCleanup(pTermMap);
mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
pe->req.checkpointId, chkId);
@ -2750,42 +2815,38 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
return TSDB_CODE_FAILED;
}
// todo: check for redundant consensus-checkpoint trans, if this kinds of trans repeatly failed.
code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
}
void *p = taosArrayPush(pList, &pe->req.taskId);
if (p == NULL) {
mError("failed to put into task list, taskId:0x%x", pe->req.taskId);
}
} else {
mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId,
pe->req.startTs, (now - pe->ts) / 1000.0);
allQualified = false;
}
}
if (allQualified) {
code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_CONSEN_NAME, false);
if (code == 0) {
code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, chkId, pInfo->pTaskList);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
} else {
numOfTrans += 1;
mndClearConsensusRspEntry(pInfo);
void *p = taosArrayPush(pStreamList, &streamId);
if (p == NULL) {
mError("failed to put into stream list, stream:0x%" PRIx64 " not remove it in consensus-chkpt list",
streamId);
}
}
} else {
mDebug("stream:0x%" PRIx64 "not create chktp-consensus, due to trans conflict", pStream->uid);
}
}
mndReleaseStream(pMnode, pStream);
int32_t alreadySend = doCleanReqList(pList, pInfo);
// clear request stream item with empty task list
if (taosArrayGetSize(pInfo->pTaskList) == 0) {
mndClearConsensusRspEntry(pInfo);
if (streamId == -1) {
mError("streamId is -1, streamId:%" PRIx64" in consensus-checkpointId hashMap, cont", pInfo->streamId);
}
void *p = taosArrayPush(pStreamList, &streamId);
if (p == NULL) {
mError("failed to put into stream list, stream:0x%" PRIx64 " not remove it in consensus-chkpt list", streamId);
}
}
numOfTrans += alreadySend;
if (numOfTrans > maxAllowedTrans) {
mInfo("already send consensus-checkpointId trans:%d, try next time", alreadySend);
// create one transaction each time
if (numOfTrans > 0) {
taosHashCancelIterate(execInfo.pStreamConsensus, pIter);
break;
}
@ -2804,6 +2865,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
taosArrayDestroy(pStreamList);
taosArrayDestroy(pList);
taosHashCleanup(pTermMap);
mDebug("end to process consensus-checkpointId in tmr, send consensus-checkpoint trans:%d", numOfTrans);
return code;

View File

@ -427,6 +427,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
.taskId = p->id.taskId,
.checkpointId = p->checkpointInfo.latestId,
.startTs = pChkInfo->consensusTs,
.nodeId = p->nodeId,
.term = p->stage,
};
SStreamObj *pStream = NULL;
@ -486,7 +488,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (pMnode != NULL) {
SArray *p = NULL;
code = mndTakeVgroupSnapshot(pMnode, &allReady, &p);
code = mndTakeVgroupSnapshot(pMnode, &allReady, &p, NULL);
taosArrayDestroy(p);
if (code) {
mError("failed to get the vgroup snapshot, ignore it and continue");

View File

@ -16,7 +16,7 @@
#include "mndStream.h"
#include "mndTrans.h"
#define MAX_CHKPT_EXEC_ELAPSED (600*1000) // 600s
#define MAX_CHKPT_EXEC_ELAPSED (600*1000*3) // 600s
typedef struct SKeyInfo {
void *pKey;
@ -137,6 +137,7 @@ static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, cons
} else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
(strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) ||
(strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) ||
(strcmp(tInfo.name, MND_STREAM_CHKPT_CONSEN_NAME) == 0) ||
strcmp(tInfo.name, MND_STREAM_RESTART_NAME) == 0) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
tInfo.name);
@ -152,7 +153,7 @@ static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, cons
// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream.
// For a given stream:
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
// 2. create/drop/reset/update trans are conflict with any other trans.
// 2. create/drop/reset/update/chkpt-consensus trans are conflict with any other trans.
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
if (lock) {
streamMutexLock(&execInfo.lock);

View File

@ -113,8 +113,6 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
taosMemoryFree(pReq);
return code;
}
mDebug("set the resume action for trans:%d", pTrans->id);
return code;
}
@ -438,6 +436,8 @@ int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pSt
return code;
}
mDebug("transId:%d start to create resume actions", pTrans->id);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
@ -578,7 +578,7 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
return 0;
}
int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts) {
int32_t doSetCheckpointIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts) {
SRestoreCheckpointInfo req = {
.taskId = pTask->id.taskId,
.streamId = pTask->id.streamId,
@ -624,7 +624,7 @@ int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* p
return code;
}
code = setTransAction(pTrans, pBuf, tlen, TDMT_STREAM_CONSEN_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
code = setTransAction(pTrans, pBuf, tlen, TDMT_STREAM_CONSEN_CHKPT, &epset, TSDB_CODE_STREAM_TASK_IVLD_STATUS, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBuf);
}
@ -632,6 +632,50 @@ int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* p
return code;
}
int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t checkpointId,
SArray *pList) {
SStreamTaskIter *pIter = NULL;
int32_t num = taosArrayGetSize(pList);
taosWLockLatch(&pStream->lock);
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
taosWUnLockLatch(&pStream->lock);
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
// find the required entry
int64_t startTs = 0;
for(int32_t i = 0; i < num; ++i) {
SCheckpointConsensusEntry* pEntry = taosArrayGet(pList, i);
if (pEntry->req.taskId == pTask->id.taskId) {
startTs = pEntry->req.startTs;
break;
}
}
code = doSetCheckpointIdAction(pMnode, pTrans, pTask, checkpointId, startTs);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
}
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return 0;
}
int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
int8_t mndTrigger) {

View File

@ -181,7 +181,7 @@ static int32_t mndCheckMnodeStatus(SMnode* pMnode) {
return TSDB_CODE_SUCCESS;
}
static int32_t mndCheckAndAddVgroupsInfo(SMnode *pMnode, SArray *pVgroupList, bool* allReady) {
static int32_t mndCheckAndAddVgroupsInfo(SMnode *pMnode, SArray *pVgroupList, bool* allReady, SHashObj* pTermMap) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SVgObj *pVgroup = NULL;
@ -243,6 +243,14 @@ static int32_t mndCheckAndAddVgroupsInfo(SMnode *pMnode, SArray *pVgroupList, bo
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
}
if (pTermMap != NULL) {
int64_t term = pVgroup->vnodeGid[0].syncTerm;
code = taosHashPut(pTermMap, &pVgroup->vgId, sizeof(pVgroup->vgId), &term, sizeof(term));
if (code) {
mError("failed to put vnode:%d term into hashMap, code:%s", pVgroup->vgId, tstrerror(code));
}
}
sdbRelease(pSdb, pVgroup);
}
@ -251,7 +259,7 @@ _end:
return code;
}
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList, SHashObj* pTermMap) {
int32_t code = 0;
SArray *pVgroupList = NULL;
@ -266,7 +274,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
}
// 1. check for all vnodes status
code = mndCheckAndAddVgroupsInfo(pMnode, pVgroupList, allReady);
code = mndCheckAndAddVgroupsInfo(pMnode, pVgroupList, allReady, pTermMap);
if (code) {
goto _err;
}
@ -728,15 +736,21 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
void *pIter = NULL;
int32_t code = 0;
SArray *pDropped = taosArrayInit(4, sizeof(int64_t));
if (pDropped == NULL) {
return terrno;
}
int32_t lino = 0;
SArray *pDropped = NULL;
mDebug("start to scan checkpoint report info");
streamMutexLock(&execInfo.lock);
int32_t num = taosHashGetSize(execInfo.pChkptStreams);
if (num == 0) {
goto _end;
}
pDropped = taosArrayInit(4, sizeof(int64_t));
TSDB_CHECK_NULL(pDropped, code, lino, _end, terrno);
while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
SChkptReportInfo *px = (SChkptReportInfo *)pIter;
if (taosArrayGetSize(px->pTaskList) == 0) {
@ -804,42 +818,35 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams);
}
_end:
streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pDropped);
if (pDropped != NULL) {
taosArrayDestroy(pDropped);
}
mDebug("end to scan checkpoint report info")
return TSDB_CODE_SUCCESS;
return code;
}
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId,
int64_t ts) {
char msg[128] = {0};
STrans *pTrans = NULL;
SStreamTask *pTask = NULL;
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId, SArray* pList) {
char msg[128] = {0};
STrans *pTrans = NULL;
snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId);
snprintf(msg, tListLen(msg), "set consen-chkpt-id for stream:0x%" PRIx64, pStream->uid);
int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg, &pTrans);
if (pTrans == NULL || code != 0) {
return terrno;
}
STaskId id = {.streamId = pStream->uid, .taskId = taskId};
code = mndGetStreamTask(&id, pStream, &pTask);
if (code) {
mError("failed to get task:0x%x in stream:%s, failed to create consensus-checkpointId", taskId, pStream->name);
sdbRelease(pMnode->pSdb, pStream);
return code;
}
code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_CONSEN_NAME, pStream->uid);
if (code) {
sdbRelease(pMnode->pSdb, pStream);
return code;
}
code = mndStreamSetChkptIdAction(pMnode, pTrans, pTask, checkpointId, ts);
code = mndStreamSetChkptIdAction(pMnode, pTrans, pStream, checkpointId, pList);
if (code != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
@ -854,8 +861,10 @@ int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, i
}
code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr());
mError("trans:%d, failed to prepare set consensus-chkptId trans for stream:0x%" PRId64 " since %s", pTrans->id,
pStream->uid, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return code;
@ -911,13 +920,15 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo
}
if (p->req.taskId == info.req.taskId) {
mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64
"->%" PRId64 " checkpointId:%" PRId64 " -> %" PRId64 " total existed:%d",
mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update send reqTs %" PRId64
"->%" PRId64 " checkpointId:%" PRId64 " -> %" PRId64 " term:%d->%d total existed:%d",
pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs, p->req.checkpointId,
info.req.checkpointId, num);
info.req.checkpointId, p->req.term, info.req.term, num);
p->req.startTs = info.req.startTs;
p->req.checkpointId = info.req.checkpointId;
p->req.transId = info.req.transId;
p->req.nodeId = info.req.nodeId;
p->req.term = info.req.term;
return;
}
}
@ -927,9 +938,10 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo
mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId);
} else {
num = taosArrayGetSize(pInfo->pTaskList);
mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64
" waiting tasks:%d",
pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num);
mDebug("s-task:0x%x (vgId:%d) checkpointId:%" PRId64 " term:%d, reqTs:%" PRId64
" added into consensus-checkpointId list, stream:0x%" PRIx64 " waiting tasks:%d",
pRestoreInfo->taskId, pRestoreInfo->nodeId, pRestoreInfo->checkpointId, info.req.term,
info.req.startTs, pRestoreInfo->streamId, num);
}
}
@ -947,6 +959,7 @@ int32_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) {
code = taosHashRemove(pHash, &streamId, sizeof(streamId));
if (code == 0) {
numOfStreams = taosHashGetSize(pHash);
mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
} else {
mError("failed to remove stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", streamId, numOfStreams);
@ -1632,7 +1645,7 @@ static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) {
}
}
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot);
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot, NULL);
if (code) {
mError("failed to get the vgroup snapshot, ignore it and continue");
}

View File

@ -156,7 +156,7 @@ int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id);
int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock,
SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq);
SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq, const char* id);
int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type);
#define TQ_ERR_GO_TO_END(c) \

View File

@ -192,7 +192,8 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
TAOS_CHECK_EXIT(buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true, &tbData.pCreateTbReq));
TAOS_CHECK_EXIT(buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true, &tbData.pCreateTbReq,
""));
{
uint64_t groupId = pDataBlock->info.id.groupId;

View File

@ -1064,11 +1064,19 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// 1. get the related stream task
code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
if (pStreamTask == NULL) {
tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
pTask->streamTaskId.taskId, pTask->id.idStr);
tqDebug("s-task:%s fill-history task set status to be dropping", id);
code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
int32_t ret = streamMetaAcquireTaskUnsafe(pMeta, &pTask->streamTaskId, &pStreamTask);
if (ret == 0 && pStreamTask != NULL) {
tqWarn("s-task:0x%" PRIx64 " stopped, not ready for related task:%s scan-history work, do nothing",
pTask->streamTaskId.taskId, pTask->id.idStr);
streamMetaReleaseTask(pMeta, pStreamTask);
} else {
tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
pTask->streamTaskId.taskId, pTask->id.idStr);
tqDebug("s-task:%s fill-history task set status to be dropping", id);
code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
}
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
streamMetaReleaseTask(pMeta, pTask);
@ -1347,10 +1355,24 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamCheckpointReadyMsg* pReq = (SStreamCheckpointReadyMsg*)pMsg->pCont;
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId,
(int32_t)pReq->downstreamTaskId);
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
SDecoder decoder;
SStreamCheckpointReadyMsg req = {0};
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder);
return code;
}
tDecoderClear(&decoder);
tqError("vgId:%d not leader, s-task:0x%x ignore the retrieve checkpoint-trigger msg from s-task:0x%x vgId:%d", vgId,
req.upstreamTaskId, req.downstreamTaskId, req.downstreamNodeId);
return TSDB_CODE_STREAM_NOT_LEADER;
}

View File

@ -41,7 +41,7 @@ static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid,
int32_t numOfTags);
static int32_t createDefaultTagColName(SArray** pColNameList);
static int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock,
const char* stbFullName, int64_t gid, bool newSubTableRule);
const char* stbFullName, int64_t gid, bool newSubTableRule, const char* id);
static int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo);
static int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId,
const char* id);
@ -262,7 +262,7 @@ int32_t createDefaultTagColName(SArray** pColNameList) {
}
int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
int64_t gid, bool newSubTableRule) {
int64_t gid, bool newSubTableRule, const char* id) {
if (pDataBlock->info.parTbName[0]) {
if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
!alreadyAddGroupId(pDataBlock->info.parTbName, gid) && gid != 0 && stbFullName) {
@ -276,16 +276,17 @@ int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock*
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// tqDebug("gen name from:%s", pDataBlock->info.parTbName);
tqDebug("s-task:%s gen name from:%s blockdata", id, pDataBlock->info.parTbName);
} else {
pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
if (pCreateTableReq->name == NULL) {
return terrno;
}
// tqDebug("copy name:%s", pDataBlock->info.parTbName);
tqDebug("s-task:%s copy name:%s from blockdata", id, pDataBlock->info.parTbName);
}
} else {
int32_t code = buildCtbNameByGroupId(stbFullName, gid, &pCreateTableReq->name);
tqDebug("s-task:%s no name in blockdata, auto-created table name:%s", id, pCreateTableReq->name);
return code;
}
@ -391,7 +392,8 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
}
}
code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, IS_NEW_SUBTB_RULE(pTask));
code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, IS_NEW_SUBTB_RULE(pTask),
pTask->id.idStr);
if (code) {
goto _end;
}
@ -643,7 +645,7 @@ bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbNam
}
int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock,
SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq) {
SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq, const char* id) {
*pReq = NULL;
SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
@ -676,7 +678,8 @@ int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t n
}
// set table name
code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule);
code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule,
id);
if (code) {
return code;
}
@ -1043,7 +1046,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
code = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
IS_NEW_SUBTB_RULE(pTask), &pTableData->pCreateTbReq);
IS_NEW_SUBTB_RULE(pTask), &pTableData->pCreateTbReq, id);
taosArrayDestroy(pTagArray);
if (code) {
@ -1160,8 +1163,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
if (!onlySubmitData || pTask->subtableWithoutMd5 == 1) {
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
numOfBlocks);
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has other type block, submit one-by-one", vgId,
id, numOfBlocks);
for (int32_t i = 0; i < numOfBlocks; ++i) {
if (streamTaskShouldStop(pTask)) {

View File

@ -87,7 +87,7 @@ static void doStartScanWal(void* param, void* tmrId) {
tmr_h pTimer = NULL;
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
tqDebug("start to do scan wal in tmr, metaRid:%" PRId64, pParam->metaId);
tqTrace("start to do scan wal in tmr, metaRid:%" PRId64, pParam->metaId);
SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
if (pMeta == NULL) {
@ -173,7 +173,7 @@ static void doStartScanWal(void* param, void* tmrId) {
_end:
streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal");
tqDebug("vgId:%d try scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT);
tqTrace("vgId:%d try scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT);
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
if (code) {

View File

@ -540,13 +540,13 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
if (!isLeader) {
tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId, true);
}
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId, &pTask);
if ((pTask == NULL) || (code != 0)) {
return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId, true);
}
code = streamTaskProcessCheckRsp(pTask, &rsp);
@ -746,6 +746,9 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
// commit the update
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
if (numOfTasks == 0) {
streamMetaResetStartInfo(&pMeta->startInfo, vgId);
}
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
@ -786,48 +789,63 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored
}
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
int32_t vgId = pMeta->vgId;
int32_t code = 0;
int64_t st = taosGetTimestampMs();
int32_t vgId = pMeta->vgId;
int32_t code = 0;
int64_t st = taosGetTimestampMs();
STaskStartInfo* pStartInfo = &pMeta->startInfo;
if (pStartInfo->startAllTasks == 1) {
// wait for the checkpoint id rsp, this rsp will be expired
if (pStartInfo->curStage == START_MARK_REQ_CHKPID) {
SStartTaskStageInfo* pCurStageInfo = taosArrayGetLast(pStartInfo->pStagesList);
tqInfo("vgId:%d only mark the req consensus checkpointId flag, reqTs:%"PRId64 " ignore and continue", vgId, pCurStageInfo->ts);
taosArrayClear(pStartInfo->pStagesList);
pStartInfo->curStage = 0;
goto _start;
} else if (pStartInfo->curStage == START_WAIT_FOR_CHKPTID) {
SStartTaskStageInfo* pCurStageInfo = taosArrayGetLast(pStartInfo->pStagesList);
tqInfo("vgId:%d already sent consensus-checkpoint msg(waiting for chkptid) expired, reqTs:%" PRId64
" rsp will be discarded",
vgId, pCurStageInfo->ts);
taosArrayClear(pStartInfo->pStagesList);
pStartInfo->curStage = 0;
goto _start;
} else if (pStartInfo->curStage == START_CHECK_DOWNSTREAM) {
pStartInfo->restartCount += 1;
tqDebug(
"vgId:%d in start tasks procedure (check downstream), inc restartCounter by 1 and wait for it completes, "
"remaining restart:%d",
vgId, pStartInfo->restartCount);
} else {
tqInfo("vgId:%d in start procedure, but not start to do anything yet, do nothing", vgId);
}
streamMetaWLock(pMeta);
if (pMeta->startInfo.startAllTasks == 1) {
pMeta->startInfo.restartCount += 1;
tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
pMeta->startInfo.restartCount);
streamMetaWUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}
pMeta->startInfo.startAllTasks = 1;
streamMetaWUnLock(pMeta);
_start:
pStartInfo->startAllTasks = 1;
terrno = 0;
tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d, ts:%" PRId64, vgId,
pMeta->updateInfo.completeTransId, pMeta->updateInfo.completeTs);
streamMetaWLock(pMeta);
streamMetaClear(pMeta);
int64_t el = taosGetTimestampMs() - st;
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.);
tqInfo("vgId:%d clear&close stream meta completed, elapsed time:%.3fs", vgId, el / 1000.);
streamMetaLoadAllTasks(pMeta);
{
STaskStartInfo* pStartInfo = &pMeta->startInfo;
taosHashClear(pStartInfo->pReadyTaskSet);
taosHashClear(pStartInfo->pFailedTaskSet);
pStartInfo->readyTs = 0;
}
if (isLeader && !tsDisableStream) {
streamMetaWUnLock(pMeta);
code = streamMetaStartAllTasks(pMeta);
} else {
streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
pMeta->startInfo.restartCount = 0;
streamMetaWUnLock(pMeta);
pStartInfo->restartCount = 0;
tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId);
}
@ -857,16 +875,20 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
code = streamMetaStartOneTask(pMeta, req.streamId, req.taskId);
return 0;
} else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
streamMetaWLock(pMeta);
code = streamMetaStartAllTasks(pMeta);
streamMetaWUnLock(pMeta);
return 0;
} else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
streamMetaWLock(pMeta);
code = restartStreamTasks(pMeta, isLeader);
streamMetaWUnLock(pMeta);
return 0;
} else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
code = streamMetaStopAllTasks(pMeta);
return 0;
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId, true);
return code;
} else if (type == STREAM_EXEC_T_STOP_ONE_TASK) {
code = streamMetaStopOneTask(pMeta, req.streamId, req.taskId);
@ -923,7 +945,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
bool scanWal = false;
int32_t code = 0;
streamMetaWLock(pMeta);
// streamMetaWLock(pMeta);
if (pStartInfo->startAllTasks == 1) {
tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId,
pMeta->startInfo.restartCount);
@ -935,7 +957,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
pStartInfo->restartCount -= 1;
tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role,
pStartInfo->restartCount);
streamMetaWUnLock(pMeta);
// streamMetaWUnLock(pMeta);
return restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
} else {
@ -950,7 +972,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
}
}
streamMetaWUnLock(pMeta);
// streamMetaWUnLock(pMeta);
return code;
}
@ -1179,7 +1201,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
pTask->hTaskInfo.operatorOpen = false;
code = streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
// code = tqScanWalAsync((STQ*)handle, false);
// code = tqScanWalAsync((STQ*)handle, false);
} else {
code = streamTrySchedExec(pTask);
}
@ -1299,12 +1321,30 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
if (pTask == NULL || (code != 0)) {
tqError("vgId:%d process consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, req.taskId);
// ignore this code to avoid error code over write
int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
if (ret) {
tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret));
// ignore this code to avoid error code over writing
if (pMeta->role == NODE_ROLE_LEADER) {
tqError("vgId:%d process consensus checkpointId req:%" PRId64
" transId:%d, failed to acquire task:0x%x, it may have been dropped/stopped already",
pMeta->vgId, req.checkpointId, req.transId, req.taskId);
int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId, true);
if (ret) {
tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret));
}
// STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
// int32_t ret1 = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
// if (ret1 == 0 && pTask != NULL) {
// SStreamTaskState s = streamTaskGetStatus(pTask);
// if (s.state == TASK_STATUS__STOP) {
// tqDebug("s-task:0x%x status:%s wait for it become init", req.taskId, s.name);
// streamMetaReleaseTask(pMeta, pTask);
// return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
// }
// }
} else {
tqDebug("vgId:%d task:0x%x stopped in follower node, not set the consensus checkpointId:%" PRId64 " transId:%d",
pMeta->vgId, req.taskId, req.checkpointId, req.transId);
}
return 0;
@ -1312,19 +1352,26 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
// discard the rsp, since it is expired.
if (req.startTs < pTask->execInfo.created) {
tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
tqWarn("s-task:%s vgId:%d createTs:%" PRId64 " recv expired consensus checkpointId:%" PRId64
" from task createTs:%" PRId64 " < task createTs:%" PRId64 ", discard",
pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs,
pTask->execInfo.created);
streamMetaAddFailedTaskSelf(pTask, now);
if (pMeta->role == NODE_ROLE_LEADER) {
streamMetaAddFailedTaskSelf(pTask, now, true);
}
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 " from mnode",
pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId);
tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64
" transId:%d from mnode, reqTs:%" PRId64 " task createTs:%" PRId64,
pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId, req.transId, req.startTs,
pTask->execInfo.created);
streamMutexLock(&pTask->lock);
SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
if (pTask->chkInfo.checkpointId < req.checkpointId) {
tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%" PRId64,
pTask->id.idStr, vgId, req.checkpointId, pTask->chkInfo.checkpointId);
@ -1334,9 +1381,8 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
return 0;
}
SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
if (pConsenInfo->consenChkptTransId >= req.transId) {
tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
tqWarn("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
pConsenInfo->consenChkptTransId, req.transId);
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
@ -1356,6 +1402,19 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
streamTaskSetConsenChkptIdRecv(pTask, req.transId, now);
streamMutexUnlock(&pTask->lock);
streamMetaWLock(pTask->pMeta);
if (pMeta->startInfo.curStage == START_WAIT_FOR_CHKPTID) {
pMeta->startInfo.curStage = START_CHECK_DOWNSTREAM;
SStartTaskStageInfo info = {.stage = pMeta->startInfo.curStage, .ts = now};
taosArrayPush(pMeta->startInfo.pStagesList, &info);
tqDebug("vgId:%d wait_for_chkptId stage -> check_down_stream stage, reqTs:%" PRId64 " , numOfStageHist:%d",
pMeta->vgId, info.ts, (int32_t)taosArrayGetSize(pMeta->startInfo.pStagesList));
}
streamMetaWUnLock(pTask->pMeta);
if (pMeta->role == NODE_ROLE_LEADER) {
code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
if (code) {

View File

@ -401,6 +401,12 @@ int vnodeValidateTableHash(SVnode *pVnode, char *tableFName) {
}
if (hashValue < pVnode->config.hashBegin || hashValue > pVnode->config.hashEnd) {
vInfo("vgId:%d, %u, %u, hashVal: %u, restored:%d", pVnode->config.vgId, pVnode->config.hashBegin,
pVnode->config.hashEnd, hashValue, pVnode->restored);
vError("vgId:%d invalid table name:%s, hashVal:0x%x, range [0x%x, 0x%x]", pVnode->config.vgId,
tableFName, hashValue, pVnode->config.hashBegin, pVnode->config.hashEnd);
return terrno = TSDB_CODE_VND_HASH_MISMATCH;
}

View File

@ -105,6 +105,9 @@ int32_t createStreamIntervalSliceOperatorInfo(struct SOperatorInfo* downstream,
int32_t buildAllResultKey(SStateStore* pStateStore, SStreamState* pState, TSKEY ts, SArray* pUpdated);
int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes);
TSKEY compareTs(void* pKey);
void clearGroupResArray(SGroupResInfo* pGroupResInfo);
void clearSessionGroupResInfo(SGroupResInfo* pGroupResInfo);
void destroyResultWinInfo(void* pRes);
#ifdef __cplusplus
}

View File

@ -980,7 +980,7 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration) {
}
if (waitDuration > 0) {
qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitDuration/1000.0);
qDebug("%s sync killed execTask, and waiting for at most %.2fs", GET_TASKID(pTaskInfo), waitDuration/1000.0);
} else {
qDebug("%s async killed execTask", GET_TASKID(pTaskInfo));
}
@ -1008,6 +1008,11 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration) {
}
}
int64_t et = taosGetTimestampMs() - st;
if (et < waitDuration) {
qInfo("%s waiting %.2fs for executor stopping", GET_TASKID(pTaskInfo), et / 1000.0);
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_SUCCESS;
}

View File

@ -58,8 +58,8 @@ void destroyStreamCountAggOperatorInfo(void* param) {
}
cleanupExprSupp(&pInfo->scalarSupp);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
clearSessionGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyEx(pInfo->pUpdated, destroyResultWinInfo);
pInfo->pUpdated = NULL;
destroyStreamAggSupporter(&pInfo->streamAggSup);

View File

@ -54,8 +54,8 @@ void destroyStreamEventOperatorInfo(void* param) {
pInfo->pOperator = NULL;
}
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
clearSessionGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyEx(pInfo->pUpdated, destroyResultWinInfo);
pInfo->pUpdated = NULL;
destroyStreamAggSupporter(&pInfo->streamAggSup);

View File

@ -135,6 +135,13 @@ void destroyStreamFillInfo(SStreamFillInfo* pFillInfo) {
taosMemoryFree(pFillInfo);
}
void clearGroupResArray(SGroupResInfo* pGroupResInfo) {
pGroupResInfo->freeItem = false;
taosArrayDestroy(pGroupResInfo->pRows);
pGroupResInfo->pRows = NULL;
pGroupResInfo->index = 0;
}
static void destroyStreamFillOperatorInfo(void* param) {
SStreamFillOperatorInfo* pInfo = (SStreamFillOperatorInfo*)param;
destroyStreamFillInfo(pInfo->pFillInfo);
@ -148,7 +155,7 @@ static void destroyStreamFillOperatorInfo(void* param) {
taosArrayDestroy(pInfo->matchInfo.pList);
pInfo->matchInfo.pList = NULL;
taosArrayDestroy(pInfo->pUpdated);
clearGroupResInfo(&pInfo->groupResInfo);
clearGroupResArray(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->pCloseTs);
if (pInfo->stateStore.streamFileStateDestroy != NULL) {

View File

@ -166,7 +166,7 @@ void destroyStreamTimeSliceOperatorInfo(void* param) {
cleanupExprSupp(&pInfo->scalarSup);
taosArrayDestroy(pInfo->historyPoints);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
taosArrayDestroy(pInfo->pUpdated);
pInfo->pUpdated = NULL;
tSimpleHashCleanup(pInfo->pUpdatedMap);
@ -174,7 +174,7 @@ void destroyStreamTimeSliceOperatorInfo(void* param) {
taosArrayDestroy(pInfo->pDelWins);
tSimpleHashCleanup(pInfo->pDeletedMap);
clearGroupResInfo(&pInfo->groupResInfo);
clearGroupResArray(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->historyWins);

View File

@ -486,6 +486,7 @@ void clearGroupResInfo(SGroupResInfo* pGroupResInfo) {
destroyFlusedPos(pPos);
}
}
pGroupResInfo->freeItem = false;
taosArrayDestroy(pGroupResInfo->pRows);
pGroupResInfo->pRows = NULL;
@ -2132,6 +2133,27 @@ void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
taosMemoryFreeClear(pSup->pDummyCtx);
}
void destroyResultWinInfo(void* pRes) {
SResultWindowInfo* pWinRes = (SResultWindowInfo*)pRes;
destroyFlusedPos(pWinRes->pStatePos);
}
void clearSessionGroupResInfo(SGroupResInfo* pGroupResInfo) {
int32_t size = taosArrayGetSize(pGroupResInfo->pRows);
if (pGroupResInfo->index >= 0 && pGroupResInfo->index < size) {
for (int32_t i = pGroupResInfo->index; i < size; i++) {
SResultWindowInfo* pRes = (SResultWindowInfo*) taosArrayGet(pGroupResInfo->pRows, i);
destroyFlusedPos(pRes->pStatePos);
pRes->pStatePos = NULL;
}
}
pGroupResInfo->freeItem = false;
taosArrayDestroy(pGroupResInfo->pRows);
pGroupResInfo->pRows = NULL;
pGroupResInfo->index = 0;
}
void destroyStreamSessionAggOperatorInfo(void* param) {
if (param == NULL) {
return;
@ -2145,8 +2167,8 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
}
cleanupExprSupp(&pInfo->scalarSupp);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
clearSessionGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyEx(pInfo->pUpdated, destroyResultWinInfo);
pInfo->pUpdated = NULL;
destroyStreamAggSupporter(&pInfo->streamAggSup);
@ -4255,8 +4277,8 @@ void destroyStreamStateOperatorInfo(void* param) {
pInfo->pOperator = NULL;
}
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
clearSessionGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyEx(pInfo->pUpdated, destroyResultWinInfo);
pInfo->pUpdated = NULL;
destroyStreamAggSupporter(&pInfo->streamAggSup);

View File

@ -44,7 +44,7 @@ typedef struct {
TdThreadMutex cfMutex;
SHashObj* cfInst;
int64_t defaultCfInit;
int64_t vgId;
} SBackendWrapper;
typedef struct {

View File

@ -843,6 +843,8 @@ int32_t streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId,
pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
TSDB_CHECK_NULL(pHandle->cfInst, code, lino, _EXIT, terrno);
pHandle->vgId = vgId;
rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create();
int32_t nBGThread = tsNumOfSnodeStreamThreads <= 2 ? 1 : tsNumOfSnodeStreamThreads / 2;
@ -914,6 +916,7 @@ _EXIT:
taosMemoryFree(backendPath);
return code;
}
void streamBackendCleanup(void* arg) {
SBackendWrapper* pHandle = (SBackendWrapper*)arg;
@ -930,6 +933,7 @@ void streamBackendCleanup(void* arg) {
rocksdb_close(pHandle->db);
pHandle->db = NULL;
}
rocksdb_options_destroy(pHandle->dbOpt);
rocksdb_env_destroy(pHandle->env);
rocksdb_cache_destroy(pHandle->cache);
@ -945,16 +949,16 @@ void streamBackendCleanup(void* arg) {
streamMutexDestroy(&pHandle->mutex);
streamMutexDestroy(&pHandle->cfMutex);
stDebug("destroy stream backend :%p", pHandle);
stDebug("vgId:%d destroy stream backend:%p", (int32_t) pHandle->vgId, pHandle);
taosMemoryFree(pHandle);
return;
}
void streamBackendHandleCleanup(void* arg) {
SBackendCfWrapper* wrapper = arg;
bool remove = wrapper->remove;
TAOS_UNUSED(taosThreadRwlockWrlock(&wrapper->rwLock));
stDebug("start to do-close backendwrapper %p, %s", wrapper, wrapper->idstr);
stDebug("start to do-close backendWrapper %p, %s", wrapper, wrapper->idstr);
if (wrapper->rocksdb == NULL) {
TAOS_UNUSED(taosThreadRwlockUnlock(&wrapper->rwLock));
return;
@ -2613,11 +2617,14 @@ int32_t taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t*
void taskDbDestroy(void* pDb, bool flush) {
STaskDbWrapper* wrapper = pDb;
if (wrapper == NULL) return;
if (wrapper == NULL) {
return;
}
int64_t st = taosGetTimestampMs();
streamMetaRemoveDB(wrapper->pMeta, wrapper->idstr);
stDebug("succ to destroy stream backend:%p", wrapper);
stDebug("%s succ to destroy stream backend:%p", wrapper->idstr, wrapper);
int8_t nCf = tListLen(ginitDict);
if (flush && wrapper->removeAllFiles == 0) {
@ -2674,25 +2681,26 @@ void taskDbDestroy(void* pDb, bool flush) {
rocksdb_comparator_destroy(compare);
rocksdb_block_based_options_destroy(tblOpt);
}
taosMemoryFree(wrapper->pCompares);
taosMemoryFree(wrapper->pCfOpts);
taosMemoryFree(wrapper->pCfParams);
streamMutexDestroy(&wrapper->mutex);
taskDbDestroyChkpOpt(wrapper);
taosMemoryFree(wrapper->idstr);
if (wrapper->removeAllFiles) {
char* err = NULL;
stInfo("drop task remove backend dat:%s", wrapper->path);
stInfo("drop task remove backend data:%s", wrapper->path);
taosRemoveDir(wrapper->path);
}
int64_t et = taosGetTimestampMs();
stDebug("%s destroy stream backend:%p completed, elapsed time:%.2fs", wrapper->idstr, wrapper, (et - st)/1000.0);
taosMemoryFree(wrapper->idstr);
taosMemoryFree(wrapper->path);
taosMemoryFree(wrapper);
return;
}
void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); }

View File

@ -20,7 +20,7 @@
#define CHECK_NOT_RSP_DURATION 60 * 1000 // 60 sec
static void processDownstreamReadyRsp(SStreamTask* pTask);
static void processDownstreamReadyRsp(SStreamTask* pTask, bool lock);
static void rspMonitorFn(void* param, void* tmrId);
static void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
@ -133,9 +133,11 @@ void streamTaskSendCheckMsg(SStreamTask* pTask) {
code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
} else { // for sink task, set it ready directly.
// streamTaskSetConsenChkptIdRecv(pTask, 0, taosGetTimestampMs());
//
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId);
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr);
processDownstreamReadyRsp(pTask);
processDownstreamReadyRsp(pTask, false);
}
if (code) {
@ -208,7 +210,7 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
}
if (left == 0) {
processDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag
processDownstreamReadyRsp(pTask, true); // all downstream tasks are ready, set the complete check downstream flag
streamTaskStopMonitorCheckRsp(pInfo, id);
} else {
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
@ -234,7 +236,7 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
}
streamMetaAddFailedTaskSelf(pTask, now);
streamMetaAddFailedTaskSelf(pTask, now, true);
} else { // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
@ -331,7 +333,7 @@ void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void processDownstreamReadyRsp(SStreamTask* pTask) {
void processDownstreamReadyRsp(SStreamTask* pTask, bool lock) {
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
int32_t code = streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
if (code) {
@ -340,7 +342,12 @@ void processDownstreamReadyRsp(SStreamTask* pTask) {
int64_t checkTs = pTask->execInfo.checkTs;
int64_t readyTs = pTask->execInfo.readyTs;
code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
if (lock) {
code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
} else {
code = streamMetaAddTaskLaunchResultNoLock(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
}
if (code) {
stError("s-task:%s failed to record the downstream task status, code:%s", pTask->id.idStr, tstrerror(code));
}
@ -351,7 +358,7 @@ void processDownstreamReadyRsp(SStreamTask* pTask) {
pTask->info.fillHistory);
}
// halt it self for count window stream task until the related fill history task completed.
// halt itself for count window stream task until the related fill history task completed.
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
@ -365,7 +372,7 @@ void processDownstreamReadyRsp(SStreamTask* pTask) {
// todo: let's retry
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stDebug("s-task:%s try to launch related fill-history task", pTask->id.idStr);
code = streamLaunchFillHistoryTask(pTask);
code = streamLaunchFillHistoryTask(pTask, lock);
if (code) {
stError("s-task:%s failed to launch history task, code:%s", pTask->id.idStr, tstrerror(code));
}

View File

@ -629,8 +629,9 @@ static int32_t doUpdateCheckpointInfoCheck(SStreamTask* pTask, bool restored, SV
code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
id, vgId, pReq->taskId, numOfTasks);
stDebug("s-task:%s vgId:%d related fill-history task:0x%" PRIx64
" dropped in update checkpointInfo, remain tasks:%d",
id, vgId, pReq->hTaskId, numOfTasks);
//todo: task may not exist, commit anyway, optimize this later
code = streamMetaCommit(pMeta);
@ -1586,18 +1587,27 @@ int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
streamTaskSetReqConsenChkptId(pTask, taosGetTimestampMs());
streamMutexUnlock(&pTask->lock);
// 1. stop the executo at first
if (pTask->exec.pExecutor != NULL) {
// we need to make sure the underlying operator is stopped right, otherwise, SIGSEG may occur,
// waiting at most for 10min
if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
int32_t code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 600000);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code));
}
}
qDestroyTask(pTask->exec.pExecutor);
pTask->exec.pExecutor = NULL;
}
// 2. destroy backend after stop executor
if (pTask->pBackend != NULL) {
streamFreeTaskState(pTask, p);
pTask->pBackend = NULL;
}
streamMetaWLock(pTask->pMeta);
if (pTask->exec.pExecutor != NULL) {
qDestroyTask(pTask->exec.pExecutor);
pTask->exec.pExecutor = NULL;
}
streamMetaWUnLock(pTask->pMeta);
return 0;
}

View File

@ -145,7 +145,7 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBlock, SStreamRetrieveReq* req) {
SRetrieveTableRsp* pRetrieve = NULL;
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
int32_t len = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN;
pRetrieve = taosMemoryCalloc(1, len);
@ -684,6 +684,9 @@ static int32_t doAddDispatchBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs,
}
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
stDebug("s-task:%s dst table hashVal:0x%x assign to vgId:%d range[0x%x, 0x%x]", pTask->id.idStr, hashValue,
pVgInfo->vgId, pVgInfo->hashBegin, pVgInfo->hashEnd);
if ((code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j])) < 0) {
stError("s-task:%s failed to add dispatch block, code:%s", pTask->id.idStr, tstrerror(terrno));
return code;
@ -727,6 +730,8 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
if (!pDataBlock->info.parTbName[0]) {
memset(pDataBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName));
stDebug("s-task:%s cached table name:%s, groupId:%" PRId64 " hashVal:0x%x", pTask->id.idStr, pBln->parTbName,
groupId, hashValue);
}
} else {
char ctbName[TSDB_TABLE_FNAME_LEN] = {0};
@ -752,9 +757,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
}
}
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db,
snprintf(ctbName, TSDB_TABLE_FNAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db,
pDataBlock->info.parTbName);
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo;
hashValue =
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
@ -762,6 +767,8 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
bln.hashValue = hashValue;
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
stDebug("s-task:%s dst table:%s hashVal:0x%x groupId:%"PRId64, pTask->id.idStr, ctbName, hashValue, groupId);
// failed to put into name buffer, no need to do anything
if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { // allow error, and do nothing
code = tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName));
@ -890,7 +897,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
} else {
streamMutexLock(&pTask->msgInfo.lock);
if (pTask->msgInfo.inMonitor == 0) {
// int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
// int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start dispatch monitor tmr in %dms, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS,
tstrerror(code));
streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
@ -967,8 +974,8 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t n
}
if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) {
stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr", id,
vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num);
stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr", id, vgId,
(int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num);
return -1;
}
@ -1128,8 +1135,7 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) {
// 1. check status in the first place
if (state.state != TASK_STATUS__CK) {
streamCleanBeforeQuitTmr(pTmrInfo, param);
stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready", id, vgId,
state.name);
stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready", id, vgId, state.name);
streamMetaReleaseTask(pTask->pMeta, pTask);
taosArrayDestroy(pNotRspList);
return;
@ -1258,7 +1264,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
}
int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN;
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) {

View File

@ -195,6 +195,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
SMetaHbInfo* pInfo = pMeta->pHbInfo;
int32_t code = 0;
bool setReqCheckpointId = false;
// not recv the hb msg rsp yet, send current hb msg again
if (pInfo->msgSendTs > 0) {
@ -243,7 +244,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
continue;
}
// todo: this lock may blocked by lock in streamMetaStartOneTask function, which may lock a very long time when
// todo: this lock may be blocked by lock in streamMetaStartOneTask function, which may lock a very long time when
// trying to load remote checkpoint data
streamMutexLock(&pTask->lock);
STaskStatusEntry entry = streamTaskGetStatusEntry(pTask);
@ -274,7 +275,8 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
streamMutexLock(&pTask->lock);
entry.checkpointInfo.consensusChkptId = streamTaskCheckIfReqConsenChkptId(pTask, pMsg->ts);
if (entry.checkpointInfo.consensusChkptId) {
entry.checkpointInfo.consensusTs = pMsg->ts;
entry.checkpointInfo.consensusTs = pTask->status.consenChkptInfo.statusTs;
setReqCheckpointId = true;
}
streamMutexUnlock(&pTask->lock);
@ -294,6 +296,20 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
streamMetaReleaseTask(pMeta, pTask);
}
if (setReqCheckpointId) {
if (pMeta->startInfo.curStage != START_MARK_REQ_CHKPID) {
stError("vgId:%d internal unknown error, current stage is:%d expected:%d", pMeta->vgId, pMeta->startInfo.curStage,
START_MARK_REQ_CHKPID);
}
pMeta->startInfo.curStage = START_WAIT_FOR_CHKPTID;
SStartTaskStageInfo info = {.stage = pMeta->startInfo.curStage, .ts = pMsg->ts};
taosArrayPush(pMeta->startInfo.pStagesList, &info);
stDebug("vgId:%d mark_req stage -> wait_for_chkptId stage, reqTs:%" PRId64 " , numOfStageHist:%d", pMeta->vgId,
info.ts, (int32_t)taosArrayGetSize(pMeta->startInfo.pStagesList));
}
pMsg->numOfTasks = taosArrayGetSize(pMsg->pTaskStatus);
if (hasMnodeEpset) {
@ -317,7 +333,6 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, rid);
if (pMeta == NULL) {
stError("invalid meta rid:%" PRId64 " failed to acquired stream-meta", rid);
// taosMemoryFree(param);
return;
}
@ -345,7 +360,6 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
} else {
stError("vgId:%d role:%d not leader not send hb to mnode, failed to release meta rid:%" PRId64, vgId, role, rid);
}
// taosMemoryFree(param);
return;
}
@ -381,7 +395,10 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
}
if (!send) {
stError("vgId:%d failed to send hmMsg to mnode, retry again in 5s, code:%s", pMeta->vgId, tstrerror(code));
stError("vgId:%d failed to send hbMsg to mnode due to acquire lock failure, retry again in 5s", pMeta->vgId);
}
if (code) {
stError("vgId:%d failed to send hbMsg to mnode, retry in 5, code:%s", pMeta->vgId, tstrerror(code));
}
streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,

View File

@ -239,7 +239,7 @@ int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
void* key = taosHashGetKey(pIter, NULL);
code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter);
if (code != 0) {
stError("failed to cvt data");
stError("vgId:%d failed to cvt data", pMeta->vgId);
goto _EXIT;
}
@ -495,6 +495,7 @@ _err:
if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt);
if (pMeta->startInfo.pStagesList) taosArrayDestroy(pMeta->startInfo.pStagesList);
taosMemoryFree(pMeta);
stError("vgId:%d failed to open stream meta, at line:%d reason:%s", vgId, lino, tstrerror(code));
@ -526,7 +527,9 @@ void streamMetaInitBackend(SStreamMeta* pMeta) {
void streamMetaClear(SStreamMeta* pMeta) {
// remove all existed tasks in this vnode
void* pIter = NULL;
int64_t st = taosGetTimestampMs();
void* pIter = NULL;
while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) {
int64_t refId = *(int64_t*)pIter;
SStreamTask* p = taosAcquireRef(streamTaskRefPool, refId);
@ -552,6 +555,9 @@ void streamMetaClear(SStreamMeta* pMeta) {
}
}
int64_t et = taosGetTimestampMs();
stDebug("vgId:%d clear task map, elapsed time:%.2fs", pMeta->vgId, (et - st)/1000.0);
if (pMeta->streamBackendRid != 0) {
int32_t code = taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
if (code) {
@ -559,6 +565,9 @@ void streamMetaClear(SStreamMeta* pMeta) {
}
}
int64_t et1 = taosGetTimestampMs();
stDebug("vgId:%d clear backend completed, elapsed time:%.2fs", pMeta->vgId, (et1 - et)/1000.0);
taosHashClear(pMeta->pTasksMap);
taosArrayClear(pMeta->pTaskList);
@ -571,6 +580,8 @@ void streamMetaClear(SStreamMeta* pMeta) {
// the willrestart/starting flag can NOT be cleared
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
taosArrayClear(pMeta->startInfo.pStagesList);
pMeta->startInfo.readyTs = 0;
}

View File

@ -25,9 +25,9 @@
#define SCANHISTORY_IDLE_TICK ((SCANHISTORY_MAX_IDLE_TIME * 1000) / SCANHISTORY_IDLE_TIME_SLICE)
typedef struct SLaunchHTaskInfo {
int64_t metaRid;
STaskId id;
STaskId hTaskId;
int64_t metaRid;
STaskId id;
STaskId hTaskId;
} SLaunchHTaskInfo;
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
@ -40,7 +40,7 @@ static void doExecScanhistoryInFuture(void* param, void* tmrId);
static int32_t doStartScanHistoryTask(SStreamTask* pTask);
static int32_t streamTaskStartScanHistory(SStreamTask* pTask);
static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask);
static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask);
static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask, bool lock);
static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now);
static void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now);
@ -122,7 +122,7 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
int32_t code = 0;
int32_t code = 0;
code = streamTaskSetReady(pTask);
if (code) {
@ -192,7 +192,7 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p
}
// an fill history task needs to be started.
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask, bool lock) {
SStreamMeta* pMeta = pTask->pMeta;
STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
const char* idStr = pTask->id.idStr;
@ -200,29 +200,44 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
int64_t now = taosGetTimestampMs();
int32_t code = 0;
SStreamTask* pHisTask = NULL;
// check stream task status in the first place.
SStreamTaskState pStatus = streamTaskGetStatus(pTask);
if (pStatus.state != TASK_STATUS__READY && pStatus.state != TASK_STATUS__HALT &&
pStatus.state != TASK_STATUS__PAUSE) {
SStreamTaskState status = streamTaskGetStatus(pTask);
if (status.state != TASK_STATUS__READY && status.state != TASK_STATUS__HALT && status.state != TASK_STATUS__PAUSE) {
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId,
pStatus.name);
return streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
status.name);
if (lock) {
return streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
} else {
return streamMetaAddTaskLaunchResultNoLock(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs,
false);
}
}
stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId);
// Set the execution conditions, including the query time window and the version range
streamMetaRLock(pMeta);
SStreamTask* pHisTask = NULL;
code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHisTask);
streamMetaRUnLock(pMeta);
if (lock) {
streamMetaRLock(pMeta);
}
if (code == 0) { // it is already added into stream meta store.
code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHisTask);
if (lock) {
streamMetaRUnLock(pMeta);
}
if (code == 0) { // it is already added into stream meta store.
if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing
stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
if (lock) {
code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
} else {
code = streamMetaAddTaskLaunchResultNoLock(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs,
true);
}
if (code) {
stError("s-task:%s failed to record start task status, code:%s", idStr, tstrerror(code));
}
@ -230,7 +245,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
if (pHisTask->pBackend == NULL) {
code = pMeta->expandTaskFn(pHisTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pHisTask, now);
streamMetaAddFailedTaskSelf(pHisTask, now, lock);
stError("s-task:%s failed to expand fill-history task, code:%s", pHisTask->id.idStr, tstrerror(code));
}
}
@ -243,7 +258,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
streamMetaReleaseTask(pMeta, pHisTask);
return code;
} else {
return launchNotBuiltFillHistoryTask(pTask);
return launchNotBuiltFillHistoryTask(pTask, lock);
}
}
@ -281,14 +296,14 @@ void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo,
SStreamMeta* pMeta = pTask->pMeta;
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
if (code) {
stError("s-task:%s failed to record the start task status, code:%s", pTask->id.idStr, tstrerror(code));
} else {
stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x",
pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId);
stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x", pTask->id.idStr,
MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId);
}
pHTaskInfo->id.taskId = 0;
@ -300,7 +315,7 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
if (streamTaskShouldStop(pTask)) { // record the failure
// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64, pInfo->id.taskId,
pInfo->hTaskId.taskId);
@ -328,7 +343,7 @@ static void doCleanup(SStreamTask* pTask, int64_t metaRid, SLaunchHTaskInfo* pIn
streamMetaReleaseTask(pMeta, pTask);
int32_t ret = taosReleaseRef(streamMetaRefPool, metaRid);
if (ret) {
stError("vgId:%d failed to release meta refId:%"PRId64, vgId, metaRid);
stError("vgId:%d failed to release meta refId:%" PRId64, vgId, metaRid);
}
if (pInfo != NULL) {
@ -363,7 +378,7 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
int32_t ret = taosReleaseRef(streamMetaRefPool, metaRid);
if (ret) {
stError("vgId:%d failed to release meta refId:%"PRId64, vgId, metaRid);
stError("vgId:%d failed to release meta refId:%" PRId64, vgId, metaRid);
}
// already dropped, no need to set the failure info into the stream task meta.
@ -416,7 +431,7 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
if (pHTask->pBackend == NULL) {
code = pMeta->expandTaskFn(pHTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pHTask, now);
streamMetaAddFailedTaskSelf(pHTask, now, true);
stError("failed to expand fill-history task:%s, code:%s", pHTask->id.idStr, tstrerror(code));
}
}
@ -451,13 +466,14 @@ int32_t createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStr
return TSDB_CODE_SUCCESS;
}
int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask, bool lock) {
SStreamMeta* pMeta = pTask->pMeta;
STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
const char* idStr = pTask->id.idStr;
int64_t hStreamId = pTask->hTaskInfo.id.streamId;
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
SLaunchHTaskInfo* pInfo = NULL;
int32_t ret = 0;
stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", idStr, pMeta->vgId, hTaskId);
@ -465,10 +481,16 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
int32_t code = createHTaskLaunchInfo(pMeta, &id, hStreamId, hTaskId, &pInfo);
if (code) {
stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr);
int32_t ret = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
if (lock) {
ret = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
} else {
ret = streamMetaAddTaskLaunchResultNoLock(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
}
if (ret) {
stError("s-task:%s add task check downstream result failed, code:%s", idStr, tstrerror(ret));
}
return code;
}
@ -483,7 +505,13 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
stError("s-task:%s failed to start timer, related fill-history task not launched", idStr);
taosMemoryFree(pInfo);
code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
if (lock) {
code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
} else {
code = streamMetaAddTaskLaunchResultNoLock(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
}
if (code) {
stError("s-task:0x%x failed to record the start task status, code:%s", hTaskId, tstrerror(code));
}
@ -508,8 +536,8 @@ int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask) {
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVer) {
SVersionRange* pRange = &pTask->dataRange.range;
if (nextProcessVer < pRange->maxVer) {
stError("s-task:%s next processdVer:%"PRId64" is less than range max ver:%"PRId64, pTask->id.idStr, nextProcessVer,
pRange->maxVer);
stError("s-task:%s next processdVer:%" PRId64 " is less than range max ver:%" PRId64, pTask->id.idStr,
nextProcessVer, pRange->maxVer);
return true;
}
@ -570,7 +598,7 @@ int32_t streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
}
void doExecScanhistoryInFuture(void* param, void* tmrId) {
int64_t taskRefId = *(int64_t*) param;
int64_t taskRefId = *(int64_t*)param;
SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
if (pTask == NULL) {
@ -595,8 +623,7 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) {
stError("s-task:%s async start history task failed", pTask->id.idStr);
}
stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr", pTask->id.idStr,
pTask->info.fillHistory);
stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr", pTask->id.idStr, pTask->info.fillHistory);
} else {
int64_t* pTaskRefId = NULL;
int32_t code = streamTaskAllocRefId(pTask, &pTaskRefId);

View File

@ -39,19 +39,18 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId;
int64_t now = taosGetTimestampMs();
SArray* pTaskList = NULL;
int32_t numOfConsensusChkptIdTasks = 0;
int32_t numOfTasks = 0;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now);
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId);
streamMetaWLock(pMeta);
streamMetaResetStartInfo(&pMeta->startInfo, vgId);
streamMetaWUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}
stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now);
code = prepareBeforeStartTasks(pMeta, &pTaskList, now);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_SUCCESS; // ignore the error and return directly
@ -65,10 +64,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if ((pTask == NULL) || (code != 0)) {
stError("vgId:%d failed to acquire task:0x%x during start task, it may be dropped", pMeta->vgId, pTaskId->taskId);
int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId, false);
if (ret) {
stError("s-task:0x%x add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret));
}
@ -79,7 +79,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
code = pMeta->expandTaskFn(pTask);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs, false);
}
}
@ -91,10 +91,10 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if ((pTask == NULL )|| (code != 0)) {
code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if ((pTask == NULL) || (code != 0)) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId, false);
if (ret) {
stError("s-task:0x%x failed add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret));
}
@ -116,14 +116,14 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
pTask->id.idStr);
code = streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task?
code = streamLaunchFillHistoryTask(pTask, false); // todo: how about retry launch fill-history task?
if (code) {
stError("s-task:%s failed to launch history task, code:%s", pTask->id.idStr, tstrerror(code));
}
}
code = streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs,
true);
code = streamMetaAddTaskLaunchResultNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs,
pInfo->readyTs, true);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
@ -136,7 +136,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
// do no added into result hashmap if it is failed due to concurrently starting of this stream task.
if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) {
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs, false);
}
}
@ -146,11 +146,23 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
// negotiate the consensus checkpoint id for current task
code = streamTaskSendNegotiateChkptIdMsg(pTask);
if (code == 0) {
numOfConsensusChkptIdTasks += 1;
}
// this task may has no checkpoint, but others tasks may generate checkpoint already?
// this task may have no checkpoint, but others tasks may generate checkpoint already?
streamMetaReleaseTask(pMeta, pTask);
}
if (numOfConsensusChkptIdTasks > 0) {
pMeta->startInfo.curStage = START_MARK_REQ_CHKPID;
SStartTaskStageInfo info = {.stage = pMeta->startInfo.curStage, .ts = now};
taosArrayPush(pMeta->startInfo.pStagesList, &info);
stDebug("vgId:%d %d task(s) 0 stage -> mark_req stage, reqTs:%" PRId64 " numOfStageHist:%d", pMeta->vgId, numOfConsensusChkptIdTasks,
info.ts, (int32_t)taosArrayGetSize(pMeta->startInfo.pStagesList));
}
// prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
// initialization, when the operation of check downstream tasks status is executed far quickly.
stInfo("vgId:%d start all task(s) completed", pMeta->vgId);
@ -159,54 +171,76 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
}
int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now) {
streamMetaWLock(pMeta);
STaskStartInfo* pInfo = &pMeta->startInfo;
if (pMeta->closeFlag) {
streamMetaWUnLock(pMeta);
stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId);
return TSDB_CODE_FAILED;
}
*pList = taosArrayDup(pMeta->pTaskList, NULL);
if (*pList == NULL) {
stError("vgId:%d failed to dup tasklist, before restart tasks, code:%s", pMeta->vgId, tstrerror(terrno));
return terrno;
}
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = now;
taosHashClear(pInfo->pReadyTaskSet);
taosHashClear(pInfo->pFailedTaskSet);
taosArrayClear(pInfo->pStagesList);
pInfo->curStage = 0;
pInfo->startTs = now;
int32_t code = streamMetaResetTaskStatus(pMeta);
streamMetaWUnLock(pMeta);
return code;
}
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) {
taosHashClear(pStartInfo->pReadyTaskSet);
taosHashClear(pStartInfo->pFailedTaskSet);
taosArrayClear(pStartInfo->pStagesList);
pStartInfo->tasksWillRestart = 0;
pStartInfo->readyTs = 0;
pStartInfo->elapsedTime = 0;
pStartInfo->curStage = 0;
// reset the sentinel flag value to be 0
pStartInfo->startAllTasks = 0;
stDebug("vgId:%d clear start-all-task info", vgId);
}
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready) {
static void streamMetaLogLaunchTasksInfo(SStreamMeta* pMeta, int32_t numOfTotal, int32_t taskId, bool ready) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
pStartInfo->readyTs = taosGetTimestampMs();
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
for (int32_t i = 0; i < taosArrayGetSize(pStartInfo->pStagesList); ++i) {
SStartTaskStageInfo* pStageInfo = taosArrayGet(pStartInfo->pStagesList, i);
stDebug("vgId:%d start task procedure, stage:%d, ts:%" PRId64, pMeta->vgId, pStageInfo->stage, pStageInfo->ts);
}
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
", readyTs:%" PRId64 " total elapsed time:%.2fs",
pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
pStartInfo->elapsedTime / 1000.0);
// print the initialization elapsed time and info
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
}
int32_t streamMetaAddTaskLaunchResultNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId,
int64_t startTs, int64_t endTs, bool ready) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
STaskId id = {.streamId = streamId, .taskId = taskId};
int32_t vgId = pMeta->vgId;
bool allRsp = true;
SStreamTask* p = NULL;
streamMetaWLock(pMeta);
int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &p);
if (code != 0) { // task does not exist in current vnode, not record the complete info
stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId);
streamMetaWUnLock(pMeta);
return 0;
}
@ -218,7 +252,6 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
"vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed "
"time:%" PRId64 "ms",
vgId, taskId, ready, el);
streamMetaWUnLock(pMeta);
return 0;
}
@ -230,35 +263,24 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
stError("vgId:%d record start task result failed, s-task:0x%" PRIx64
" already exist start results in meta start task result hashmap",
vgId, id.taskId);
code = 0;
} else {
stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed", vgId, id.taskId);
stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed, code:%s", vgId,
id.taskId, tstrerror(code));
}
streamMetaWUnLock(pMeta);
return code;
}
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet);
int32_t numOfSucc = taosHashGetSize(pStartInfo->pReadyTaskSet);
int32_t numOfRecv = numOfSucc + taosHashGetSize(pStartInfo->pFailedTaskSet);
allRsp = allCheckDownstreamRsp(pMeta, pStartInfo, numOfTotal);
if (allRsp) {
pStartInfo->readyTs = taosGetTimestampMs();
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
", readyTs:%" PRId64 " total elapsed time:%.2fs",
vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
pStartInfo->elapsedTime / 1000.0);
// print the initialization elapsed time and info
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
streamMetaLogLaunchTasksInfo(pMeta, numOfTotal, taskId, ready);
streamMetaResetStartInfo(pStartInfo, vgId);
streamMetaWUnLock(pMeta);
code = pStartInfo->completeFn(pMeta);
} else {
streamMetaWUnLock(pMeta);
stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready,
numOfRecv, numOfTotal);
}
@ -266,6 +288,17 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
return code;
}
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready) {
int32_t code = 0;
streamMetaWLock(pMeta);
code = streamMetaAddTaskLaunchResultNoLock(pMeta, streamId, taskId, startTs, endTs, ready);
streamMetaWUnLock(pMeta);
return code;
}
// check all existed tasks are received rsp
bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal) {
for (int32_t i = 0; i < numOfTotal; ++i) {
@ -279,6 +312,7 @@ bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32
if (px == NULL) {
px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx));
if (px == NULL) {
stDebug("vgId:%d s-task:0x%x start result not rsp yet", pMeta->vgId, (int32_t) idx.taskId);
return false;
}
}
@ -292,7 +326,7 @@ void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
void* pIter = NULL;
size_t keyLen = 0;
stInfo("vgId:%d %d tasks check-downstream completed, %s", vgId, taosHashGetSize(pTaskSet),
stInfo("vgId:%d %d tasks complete check-downstream, %s", vgId, taosHashGetSize(pTaskSet),
succ ? "success" : "failed");
while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
@ -323,12 +357,19 @@ int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo) {
return terrno;
}
pStartInfo->pStagesList = taosArrayInit(4, sizeof(SStartTaskStageInfo));
if (pStartInfo->pStagesList == NULL) {
return terrno;
}
return 0;
}
void streamMetaClearStartInfo(STaskStartInfo* pStartInfo) {
taosHashCleanup(pStartInfo->pReadyTaskSet);
taosHashCleanup(pStartInfo->pFailedTaskSet);
taosArrayDestroy(pStartInfo->pStagesList);
pStartInfo->readyTs = 0;
pStartInfo->elapsedTime = 0;
pStartInfo->startTs = 0;
@ -348,7 +389,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
if ((pTask == NULL) || (code != 0)) {
stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId);
int32_t ret = streamMetaAddFailedTask(pMeta, streamId, taskId);
int32_t ret = streamMetaAddFailedTask(pMeta, streamId, taskId, true);
if (ret) {
stError("s-task:0x%x add check downstream failed, core:%s", taskId, tstrerror(ret));
}
@ -365,7 +406,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
}
// the start all tasks procedure may happen to start the newly deployed stream task, and results in the
// concurrently start this task by two threads.
// concurrent start this task by two threads.
streamMutexLock(&pTask->lock);
SStreamTaskState status = streamTaskGetStatus(pTask);
@ -382,12 +423,14 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
if(pTask->status.downstreamReady != 0) {
if (pTask->status.downstreamReady != 0) {
stFatal("s-task:0x%x downstream should be not ready, but it ready here, internal error happens", taskId);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
streamMetaWLock(pMeta);
// avoid initialization and destroy running concurrently.
streamMutexLock(&pTask->lock);
if (pTask->pBackend == NULL) {
@ -395,7 +438,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
streamMutexUnlock(&pTask->lock);
if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs, false);
}
} else {
streamMutexUnlock(&pTask->lock);
@ -410,12 +453,14 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
// do no added into result hashmap if it is failed due to concurrently starting of this stream task.
if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) {
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs, false);
}
}
}
streamMetaWUnLock(pMeta);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
@ -470,26 +515,21 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo;
int32_t vgId = pTask->pMeta->vgId;
int32_t vgId = pTask->pMeta->vgId;
if (pConChkptInfo->status == TASK_CONSEN_CHKPT_REQ) {
// mark the sending of req consensus checkpoint request.
pConChkptInfo->status = TASK_CONSEN_CHKPT_SEND;
pConChkptInfo->statusTs = ts;
stDebug("s-task:%s vgId:%d set requiring consensus-chkptId in hbMsg, ts:%" PRId64, pTask->id.idStr,
vgId, pConChkptInfo->statusTs);
return 1;
} else {
int32_t el = (ts - pConChkptInfo->statusTs) / 1000;
// not recv consensus-checkpoint rsp for 60sec, send it again in hb to mnode
if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && el > 60) {
pConChkptInfo->statusTs = ts;
stWarn(
"s-task:%s vgId:%d not recv consensus-chkptId for %ds(more than 60s), set requiring in Hb again, ts:%" PRId64,
pTask->id.idStr, vgId, el, pConChkptInfo->statusTs);
if (pTask->pMeta->startInfo.curStage == START_MARK_REQ_CHKPID) {
if (pConChkptInfo->status == TASK_CONSEN_CHKPT_REQ) {
// mark the sending of req consensus checkpoint request.
pConChkptInfo->status = TASK_CONSEN_CHKPT_SEND;
stDebug("s-task:%s vgId:%d set requiring consensus-chkptId in hbMsg, ts:%" PRId64, pTask->id.idStr, vgId,
pConChkptInfo->statusTs);
return 1;
} else if (pConChkptInfo->status == 0) {
stDebug("vgId:%d s-task:%s not need to set the req checkpointId, current stage:%d", vgId, pTask->id.idStr,
pConChkptInfo->status);
} else {
stWarn("vgId:%d, s-task:%s restart procedure expired, start stage:%d", vgId, pTask->id.idStr,
pConChkptInfo->status);
}
}
@ -513,10 +553,11 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
pInfo->statusTs = ts;
pInfo->consenChkptTransId = 0;
stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64, pTask->id.idStr, prevTrans, ts);
stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64 ", task created ts:%" PRId64,
pTask->id.idStr, prevTrans, ts, pTask->execInfo.created);
}
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, bool lock) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t now = taosGetTimestampMs();
int64_t startTs = 0;
@ -527,7 +568,9 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId);
streamMetaRLock(pMeta);
if (lock) {
streamMetaRLock(pMeta);
}
code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code == 0) {
@ -536,15 +579,26 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
hId = pTask->hTaskInfo.id;
streamMetaReleaseTask(pMeta, pTask);
streamMetaRUnLock(pMeta);
if (lock) {
streamMetaRUnLock(pMeta);
}
// add the failed task info, along with the related fill-history task info into tasks list.
code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
if (hasFillhistoryTask) {
code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
if (lock) {
code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
if (hasFillhistoryTask) {
code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
}
} else {
code = streamMetaAddTaskLaunchResultNoLock(pMeta, streamId, taskId, startTs, now, false);
if (hasFillhistoryTask) {
code = streamMetaAddTaskLaunchResultNoLock(pMeta, hId.streamId, hId.taskId, startTs, now, false);
}
}
} else {
streamMetaRUnLock(pMeta);
if (lock) {
streamMetaRUnLock(pMeta);
}
stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
streamId, taskId, pMeta->vgId);
@ -554,9 +608,17 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
return code;
}
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) {
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs, bool lock) {
int32_t startTs = pTask->execInfo.checkTs;
int32_t code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
int32_t code = 0;
if (lock) {
code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
} else {
code = streamMetaAddTaskLaunchResultNoLock(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs,
false);
}
if (code) {
stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code));
}
@ -564,7 +626,13 @@ void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) {
// automatically set the related fill-history task to be failed.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
if (lock) {
code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
} else {
code = streamMetaAddTaskLaunchResultNoLock(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
}
if (code) {
stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code));
}

View File

@ -99,11 +99,12 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
}
SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamTask* pStreamTask = pTask;
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
stDebug("open stream state %p, %s", pState, path);
stDebug("s-task:%s open stream state %p, %s", pStreamTask->id.idStr, pState, path);
if (pState == NULL) {
code = terrno;
@ -117,7 +118,6 @@ SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, i
QUERY_CHECK_CODE(code, lino, _end);
}
SStreamTask* pStreamTask = pTask;
pState->streamId = streamId;
pState->taskId = taskId;
TAOS_UNUSED(tsnprintf(pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr), "0x%" PRIx64 "-0x%x",
@ -133,8 +133,8 @@ SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, i
pState->parNameMap = tSimpleHashInit(1024, hashFn);
QUERY_CHECK_NULL(pState->parNameMap, code, lino, _end, terrno);
stInfo("open state %p on backend %p 0x%" PRIx64 "-%d succ", pState, pMeta->streamBackend, pState->streamId,
pState->taskId);
stInfo("s-task:%s open state %p on backend %p 0x%" PRIx64 "-%d succ", pStreamTask->id.idStr, pState,
pMeta->streamBackend, pState->streamId, pState->taskId);
return pState;
_end:

View File

@ -15,21 +15,21 @@
#include "streamInt.h"
void streamMutexLock(TdThreadMutex *pMutex) {
void streamMutexLock(TdThreadMutex* pMutex) {
int32_t code = taosThreadMutexLock(pMutex);
if (code) {
stError("%p mutex lock failed, code:%s", pMutex, tstrerror(code));
}
}
void streamMutexUnlock(TdThreadMutex *pMutex) {
void streamMutexUnlock(TdThreadMutex* pMutex) {
int32_t code = taosThreadMutexUnlock(pMutex);
if (code) {
stError("%p mutex unlock failed, code:%s", pMutex, tstrerror(code));
}
}
void streamMutexDestroy(TdThreadMutex *pMutex) {
void streamMutexDestroy(TdThreadMutex* pMutex) {
int32_t code = taosThreadMutexDestroy(pMutex);
if (code) {
stError("%p mutex destroy, code:%s", pMutex, tstrerror(code));
@ -37,7 +37,7 @@ void streamMutexDestroy(TdThreadMutex *pMutex) {
}
void streamMetaRLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
int32_t code = taosThreadRwlockRdlock(&pMeta->lock);
if (code) {
stError("vgId:%d meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code));
@ -45,7 +45,7 @@ void streamMetaRLock(SStreamMeta* pMeta) {
}
void streamMetaRUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-runlock", pMeta->vgId);
// stTrace("vgId:%d meta-runlock", pMeta->vgId);
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d meta-runlock failed, code:%s", pMeta->vgId, tstrerror(code));
@ -57,14 +57,16 @@ void streamMetaRUnLock(SStreamMeta* pMeta) {
int32_t streamMetaTryRlock(SStreamMeta* pMeta) {
int32_t code = taosThreadRwlockTryRdlock(&pMeta->lock);
if (code) {
stError("vgId:%d try meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code));
if (code != TAOS_SYSTEM_ERROR(EBUSY)) {
stError("vgId:%d try meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code));
}
}
return code;
}
void streamMetaWLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
int32_t code = taosThreadRwlockWrlock(&pMeta->lock);
if (code) {
stError("vgId:%d failed to apply wlock, code:%s", pMeta->vgId, tstrerror(code));
@ -72,7 +74,7 @@ void streamMetaWLock(SStreamMeta* pMeta) {
}
void streamMetaWUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code) {
stError("vgId:%d failed to apply wunlock, code:%s", pMeta->vgId, tstrerror(code));
@ -94,5 +96,5 @@ void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName,
}
int32_t streamGetFatalError(const SStreamMeta* pMeta) {
return atomic_load_32((volatile int32_t*) &pMeta->fatalInfo.code);
return atomic_load_32((volatile int32_t*)&pMeta->fatalInfo.code);
}

View File

@ -17,7 +17,7 @@ from util.sql import *
from util.common import *
class TDTestCase:
updatecfgDict = {'ttlUnit':5,'ttlPushInterval':3}
updatecfgDict = {'ttlUnit':5,'ttlPushInterval':3, 'mdebugflag':143}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)