Merge remote-tracking branch 'origin/main' into enh/useSafySysFunc
This commit is contained in:
commit
b5642faef5
|
@ -164,9 +164,6 @@ If you are using Maven to manage your project, simply add the following dependen
|
|||
pip3 install taospy[ws]
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
- **Installation Verification**
|
||||
|
||||
<Tabs defaultValue="rest">
|
||||
|
@ -199,8 +196,8 @@ import taosws
|
|||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
</TabItem>
|
||||
|
||||
<Tabs>
|
||||
<TabItem label="Go" value="go">
|
||||
|
||||
Edit `go.mod` to add the `driver-go` dependency.
|
||||
|
|
|
@ -1,59 +1,80 @@
|
|||
import subprocess
|
||||
import re
|
||||
|
||||
# 执行 git fetch 命令并捕获输出
|
||||
def git_fetch():
|
||||
result = subprocess.run(['git', 'fetch'], capture_output=True, text=True)
|
||||
return result
|
||||
|
||||
# 解析分支名称
|
||||
def git_prune():
|
||||
# git remote prune origin
|
||||
print("git remote prune origin")
|
||||
result = subprocess.run(['git', 'remote', 'prune', 'origin'], capture_output=True, text=True)
|
||||
return result
|
||||
|
||||
def parse_branch_name_type1(error_output):
|
||||
# 使用正则表达式匹配 'is at' 前的分支名称
|
||||
# error: cannot lock ref 'refs/remotes/origin/fix/3.0/TD-32817': is at 7af5 but expected eaba
|
||||
# match the branch name before ‘is at’ with a regular expression
|
||||
match = re.search(r"error: cannot lock ref '(refs/remotes/origin/[^']+)': is at", error_output)
|
||||
if match:
|
||||
return match.group(1)
|
||||
return None
|
||||
|
||||
# 解析第二种错误中的分支名称
|
||||
def parse_branch_name_type2(error_output):
|
||||
# 使用正则表达式匹配 'exists' 前的第一个引号内的分支名称
|
||||
# match the branch name before ‘exists; cannot create’ with a regular expression
|
||||
match = re.search(r"'(refs/remotes/origin/[^']+)' exists;", error_output)
|
||||
if match:
|
||||
return match.group(1)
|
||||
return None
|
||||
|
||||
# 执行 git update-ref -d 命令
|
||||
# parse branch name from error output of git remote prune origin
|
||||
def parse_branch_name_type3(error_output):
|
||||
# match the branch name before the first single quote before 'Unable to' with a regular expression
|
||||
# git error: could not delete references: cannot lock ref 'refs/remotes/origin/test/3.0/TS-4893': Unable to create 'D:/workspace/main/TDinternal/community/.git/refs/remotes/origin/test/3.0/TS-4893.lock': File exists
|
||||
match = re.search(r"references: cannot lock ref '(refs/remotes/origin/[^']+)': Unable to", error_output)
|
||||
if match:
|
||||
return match.group(1)
|
||||
return None
|
||||
|
||||
|
||||
# execute git update-ref -d <branch_name> to delete the ref
|
||||
def git_update_ref(branch_name):
|
||||
if branch_name:
|
||||
subprocess.run(['git', 'update-ref', '-d', f'{branch_name}'], check=True)
|
||||
|
||||
# 解析错误类型并执行相应的修复操作
|
||||
# parse error type and execute corresponding repair operation
|
||||
def handle_error(error_output):
|
||||
# 错误类型1:本地引用的提交ID与远程不一致
|
||||
if "is at" in error_output and "but expected" in error_output:
|
||||
branch_name = parse_branch_name_type1(error_output)
|
||||
if branch_name:
|
||||
print(f"Detected error type 1, attempting to delete ref for branch: {branch_name}")
|
||||
git_update_ref(branch_name)
|
||||
else:
|
||||
print("Error parsing branch name for type 1.")
|
||||
# 错误类型2:尝试创建新的远程引用时,本地已经存在同名的引用
|
||||
elif "exists; cannot create" in error_output:
|
||||
branch_name = parse_branch_name_type2(error_output)
|
||||
if branch_name:
|
||||
print(f"Detected error type 2, attempting to delete ref for branch: {branch_name}")
|
||||
git_update_ref(branch_name)
|
||||
else:
|
||||
print("Error parsing branch name for type 2.")
|
||||
error_types = [
|
||||
("is at", "but expected", parse_branch_name_type1, "type 1"),
|
||||
("exists; cannot create", None, parse_branch_name_type2, "type 2"),
|
||||
("Unable to create", "File exists", parse_branch_name_type3, "type 3")
|
||||
]
|
||||
|
||||
for error_type in error_types:
|
||||
if error_type[0] in error_output and (error_type[1] is None or error_type[1] in error_output):
|
||||
branch_name = error_type[2](error_output)
|
||||
if branch_name:
|
||||
print(f"Detected error {error_type[3]}, attempting to delete ref for branch: {branch_name}")
|
||||
git_update_ref(branch_name)
|
||||
else:
|
||||
print(f"Error parsing branch name for {error_type[3]}.")
|
||||
break
|
||||
|
||||
# 主函数
|
||||
def main():
|
||||
fetch_result = git_fetch()
|
||||
if fetch_result.returncode != 0: # 如果 git fetch 命令失败
|
||||
if fetch_result.returncode != 0:
|
||||
error_output = fetch_result.stderr
|
||||
handle_error(error_output)
|
||||
else:
|
||||
print("Git fetch successful.")
|
||||
|
||||
prune_result = git_prune()
|
||||
print(prune_result.returncode)
|
||||
if prune_result.returncode != 0:
|
||||
error_output = prune_result.stderr
|
||||
print(error_output)
|
||||
handle_error(error_output)
|
||||
else:
|
||||
print("Git prune successful.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -1931,11 +1931,6 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
|||
sdbCancelFetch(pSdb, pIter);
|
||||
return terrno = code;
|
||||
}
|
||||
|
||||
code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
|
||||
if (code) {
|
||||
mError("failed to register trans, transId:%d, and continue", pTrans->id);
|
||||
}
|
||||
}
|
||||
|
||||
if (!includeAllNodes) {
|
||||
|
@ -1951,6 +1946,12 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
|||
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
|
||||
pStream->name, pTrans->id);
|
||||
|
||||
// NOTE: for each stream, we register one trans entry for task update
|
||||
code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
|
||||
if (code) {
|
||||
mError("failed to register trans, transId:%d, and continue", pTrans->id);
|
||||
}
|
||||
|
||||
code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
|
||||
|
||||
// todo: not continue, drop all and retry again
|
||||
|
|
|
@ -35,7 +35,11 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
|
|||
size_t keyLen = 0;
|
||||
void *pIter = NULL;
|
||||
SArray *pList = taosArrayInit(4, sizeof(SKeyInfo));
|
||||
int32_t num = 0;
|
||||
int32_t numOfChkpt = 0;
|
||||
|
||||
if (pNumOfActiveChkpt != NULL) {
|
||||
*pNumOfActiveChkpt = 0;
|
||||
}
|
||||
|
||||
if (pList == NULL) {
|
||||
return terrno;
|
||||
|
@ -50,15 +54,15 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
|
|||
void *pKey = taosHashGetKey(pEntry, &keyLen);
|
||||
// key is the name of src/dst db name
|
||||
SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
|
||||
mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name,
|
||||
pEntry->startTime);
|
||||
mDebug("transId:%d stream:0x%" PRIx64 " %s startTs:%" PRId64 " cleared since finished", pEntry->transId,
|
||||
pEntry->streamId, pEntry->name, pEntry->startTime);
|
||||
void* p = taosArrayPush(pList, &info);
|
||||
if (p == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
} else {
|
||||
if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
|
||||
num++;
|
||||
numOfChkpt++;
|
||||
}
|
||||
mndReleaseTrans(pMnode, pTrans);
|
||||
}
|
||||
|
@ -78,48 +82,34 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
|
|||
}
|
||||
}
|
||||
|
||||
mDebug("clear %d finished stream-trans, remained:%d, active checkpoint trans:%d", size,
|
||||
taosHashGetSize(execInfo.transMgmt.pDBTrans), num);
|
||||
mDebug("clear %d finished stream-trans, active trans:%d, active checkpoint trans:%d", size,
|
||||
taosHashGetSize(execInfo.transMgmt.pDBTrans), numOfChkpt);
|
||||
|
||||
taosArrayDestroy(pList);
|
||||
|
||||
if (pNumOfActiveChkpt != NULL) {
|
||||
*pNumOfActiveChkpt = num;
|
||||
*pNumOfActiveChkpt = numOfChkpt;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream.
|
||||
// For a given stream:
|
||||
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
|
||||
// 2. create/drop/reset/update trans are conflict with any other trans.
|
||||
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
|
||||
if (lock) {
|
||||
streamMutexLock(&execInfo.lock);
|
||||
}
|
||||
|
||||
static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName) {
|
||||
int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
|
||||
if (num <= 0) {
|
||||
if (lock) {
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// if any task updates exist, any other stream trans are not allowed to be created
|
||||
int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
|
||||
if (code) {
|
||||
mError("failed to clear finish trans, code:%s", tstrerror(code));
|
||||
mError("failed to clear finish trans, code:%s, and continue", tstrerror(code));
|
||||
}
|
||||
|
||||
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
|
||||
if (pEntry != NULL) {
|
||||
SStreamTransInfo tInfo = *pEntry;
|
||||
|
||||
if (lock) {
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
}
|
||||
|
||||
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
|
||||
if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) &&
|
||||
(strcmp(pTransName, MND_STREAM_RESTART_NAME) != 0)) {
|
||||
|
@ -141,11 +131,25 @@ int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char
|
|||
mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream.
|
||||
// For a given stream:
|
||||
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
|
||||
// 2. create/drop/reset/update trans are conflict with any other trans.
|
||||
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
|
||||
if (lock) {
|
||||
streamMutexLock(&execInfo.lock);
|
||||
}
|
||||
|
||||
int32_t code = doStreamTransConflictCheck(pMnode, streamId, pTransName);
|
||||
|
||||
if (lock) {
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
}
|
||||
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {
|
||||
|
|
|
@ -254,11 +254,12 @@ static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTask
|
|||
}
|
||||
|
||||
static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
int32_t code = streamMetaUnregisterTask(pMeta, streamId, taskId);
|
||||
if (code != 0) {
|
||||
smaError("vgId:%d, rsma task:%" PRIi64 ",%d drop failed since %s", pMeta->vgId, streamId, taskId, tstrerror(code));
|
||||
}
|
||||
streamMetaWLock(pMeta);
|
||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
if (streamMetaCommit(pMeta) < 0) {
|
||||
// persist to disk
|
||||
|
|
|
@ -1108,91 +1108,76 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
// always return success to mnode
|
||||
//todo: handle failure of build and send msg to mnode
|
||||
static void doSendChkptSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, int32_t code,
|
||||
int32_t taskId) {
|
||||
SRpcMsg rsp = {0};
|
||||
int32_t ret = streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &rsp, code);
|
||||
if (ret) { // suppress the error in build checkpoint source rsp
|
||||
tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", taskId, tstrerror(ret));
|
||||
}
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
}
|
||||
|
||||
// no matter what kinds of error happened, make sure the mnode will receive the success execution code.
|
||||
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) {
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
int32_t code = 0;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
int32_t code = 0;
|
||||
SStreamCheckpointSourceReq req = {0};
|
||||
SDecoder decoder = {0};
|
||||
SStreamTask* pTask = NULL;
|
||||
int64_t checkpointId = 0;
|
||||
|
||||
// disable auto rsp to mnode
|
||||
pRsp->info.handle = NULL;
|
||||
|
||||
SStreamCheckpointSourceReq req = {0};
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||
if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) {
|
||||
code = TSDB_CODE_MSG_DECODE_ERROR;
|
||||
tDecoderClear(&decoder);
|
||||
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
|
||||
|
||||
SRpcMsg rsp = {0};
|
||||
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
if (ret) { // suppress the error in build checkpointsource rsp
|
||||
tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code));
|
||||
}
|
||||
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode
|
||||
doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
|
||||
return TSDB_CODE_SUCCESS; // always return success to mnode,
|
||||
}
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
if (!vnodeIsRoleLeader(pTq->pVnode)) {
|
||||
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
|
||||
SRpcMsg rsp = {0};
|
||||
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
if (ret) { // suppress the error in build checkpointsource rsp
|
||||
tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code));
|
||||
}
|
||||
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode
|
||||
doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
|
||||
return TSDB_CODE_SUCCESS; // always return success to mnode
|
||||
}
|
||||
|
||||
if (!pTq->pVnode->restored) {
|
||||
tqDebug("vgId:%d checkpoint-source msg received during restoring, checkpointId:%" PRId64
|
||||
", transId:%d s-task:0x%x ignore it",
|
||||
vgId, req.checkpointId, req.transId, req.taskId);
|
||||
SRpcMsg rsp = {0};
|
||||
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
if (ret) { // suppress the error in build checkpointsource rsp
|
||||
tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code));
|
||||
}
|
||||
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode
|
||||
doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
|
||||
return TSDB_CODE_SUCCESS; // always return success to mnode
|
||||
}
|
||||
|
||||
SStreamTask* pTask = NULL;
|
||||
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
|
||||
if (pTask == NULL || code != 0) {
|
||||
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64
|
||||
" transId:%d it may have been destroyed",
|
||||
vgId, req.taskId, req.checkpointId, req.transId);
|
||||
SRpcMsg rsp = {0};
|
||||
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
if (ret) { // suppress the error in build checkpointsource rsp
|
||||
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pTask->status.downstreamReady != 1) {
|
||||
streamTaskSetFailedChkptInfo(pTask, req.transId, req.checkpointId); // record the latest failed checkpoint id
|
||||
// record the latest failed checkpoint id
|
||||
streamTaskSetFailedChkptInfo(pTask, req.transId, req.checkpointId);
|
||||
tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpointId:%" PRId64
|
||||
", transId:%d set it failed",
|
||||
pTask->id.idStr, req.checkpointId, req.transId);
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
SRpcMsg rsp = {0};
|
||||
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
if (ret) { // suppress the error in build checkpointsource rsp
|
||||
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
|
||||
return TSDB_CODE_SUCCESS; // todo retry handle error
|
||||
}
|
||||
|
||||
|
@ -1207,14 +1192,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
SRpcMsg rsp = {0};
|
||||
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
if (ret) { // suppress the error in build checkpointsource rsp
|
||||
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
|
@ -1226,7 +1204,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
|
||||
// check if the checkpoint msg already sent or not.
|
||||
if (status == TASK_STATUS__CK) {
|
||||
int64_t checkpointId = 0;
|
||||
streamTaskGetActiveCheckpointInfo(pTask, NULL, &checkpointId);
|
||||
|
||||
tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
|
||||
|
@ -1235,7 +1212,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SYN_PROPOSE_NOT_READY, req.taskId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else { // checkpoint already finished, and not in checkpoint status
|
||||
if (req.checkpointId <= pTask->chkInfo.checkpointId) {
|
||||
|
@ -1245,15 +1222,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
SRpcMsg rsp = {0};
|
||||
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
if (ret) { // suppress the error in build checkpointsource rsp
|
||||
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
|
||||
doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
@ -1264,7 +1233,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
if (code) {
|
||||
qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId,
|
||||
tstrerror(code));
|
||||
return code;
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (req.mndTrigger) {
|
||||
|
@ -1279,13 +1250,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
|
||||
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
SRpcMsg rsp = {0};
|
||||
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
|
||||
if (ret) { // suppress the error in build checkpointsource rsp
|
||||
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return TSDB_CODE_SUCCESS;
|
||||
streamTaskSetCheckpointFailed(pTask); // set the checkpoint failed
|
||||
doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
|
|
@ -13,9 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <common/tmsg.h>
|
||||
#include "tcommon.h"
|
||||
#include "tmsg.h"
|
||||
#include "tq.h"
|
||||
|
||||
#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1))
|
||||
|
@ -50,7 +48,7 @@ static int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkI
|
|||
static bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo);
|
||||
static int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id);
|
||||
static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode);
|
||||
static void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
|
||||
static void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
|
||||
static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode,
|
||||
int64_t earlyTs);
|
||||
|
||||
|
@ -1062,7 +1060,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
return;
|
||||
}
|
||||
|
||||
reubuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs);
|
||||
rebuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1165,7 +1163,7 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) {
|
||||
void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) {
|
||||
int32_t code = 0;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
|
|
|
@ -17,19 +17,20 @@
|
|||
#include "vnd.h"
|
||||
|
||||
#define MAX_REPEAT_SCAN_THRESHOLD 3
|
||||
#define SCAN_WAL_IDLE_DURATION 100
|
||||
#define SCAN_WAL_IDLE_DURATION 500 // idle for 500ms to do next wal scan
|
||||
|
||||
typedef struct SBuildScanWalMsgParam {
|
||||
int64_t metaId;
|
||||
int32_t numOfTasks;
|
||||
} SBuildScanWalMsgParam;
|
||||
|
||||
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
||||
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta);
|
||||
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
|
||||
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
|
||||
static bool taskReadyForDataFromWal(SStreamTask* pTask);
|
||||
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
|
||||
static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration);
|
||||
static int32_t doScanWalAsync(STQ* pTq, bool ckPause);
|
||||
|
||||
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
|
||||
int32_t tqScanWal(STQ* pTq) {
|
||||
|
@ -37,12 +38,11 @@ int32_t tqScanWal(STQ* pTq) {
|
|||
int32_t vgId = pMeta->vgId;
|
||||
int64_t st = taosGetTimestampMs();
|
||||
int32_t numOfTasks = 0;
|
||||
bool shouldIdle = true;
|
||||
|
||||
tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter);
|
||||
|
||||
// check all tasks
|
||||
int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle);
|
||||
int32_t code = doScanWalForAllTasks(pMeta);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code));
|
||||
return code;
|
||||
|
@ -133,10 +133,9 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
|
|||
}
|
||||
|
||||
int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
bool alreadyRestored = pTq->pVnode->restored;
|
||||
int32_t numOfTasks = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
// do not launch the stream tasks, if it is a follower or not restored vnode.
|
||||
if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) {
|
||||
|
@ -144,47 +143,8 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
|||
}
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
if (numOfTasks == 0) {
|
||||
tqDebug("vgId:%d no stream tasks existed to run", vgId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pMeta->startInfo.startAllTasks) {
|
||||
tqTrace("vgId:%d in restart procedure, not scan wal", vgId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
pMeta->scanInfo.scanCounter += 1;
|
||||
if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
|
||||
pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD;
|
||||
}
|
||||
|
||||
if (pMeta->scanInfo.scanCounter > 1) {
|
||||
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter);
|
||||
streamMetaWUnLock(pMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t numOfPauseTasks = pMeta->numOfPausedTasks;
|
||||
if (ckPause && numOfTasks == numOfPauseTasks) {
|
||||
tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId);
|
||||
|
||||
// reset the counter value, since we do not launch the scan wal operation.
|
||||
pMeta->scanInfo.scanCounter = 0;
|
||||
streamMetaWUnLock(pMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId,
|
||||
numOfTasks, alreadyRestored);
|
||||
|
||||
int32_t code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
||||
code = doScanWalAsync(pTq, ckPause);
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -368,11 +328,8 @@ int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfIt
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||
*pScanIdle = true;
|
||||
bool noDataInWal = true;
|
||||
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
|
||||
int32_t vgId = pStreamMeta->vgId;
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
|
||||
if (numOfTasks == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -410,8 +367,6 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
continue;
|
||||
}
|
||||
|
||||
*pScanIdle = false;
|
||||
|
||||
// seek the stored version and extract data from WAL
|
||||
code = setWalReaderStartOffset(pTask, vgId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -437,7 +392,6 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
if ((numOfItems > 0) || hasNewData) {
|
||||
noDataInWal = false;
|
||||
code = streamTrySchedExec(pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
|
@ -449,11 +403,47 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
}
|
||||
|
||||
// all wal are checked, and no new data available in wal.
|
||||
if (noDataInWal) {
|
||||
*pScanIdle = true;
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTaskList);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t doScanWalAsync(STQ* pTq, bool ckPause) {
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
bool alreadyRestored = pTq->pVnode->restored;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
|
||||
if (numOfTasks == 0) {
|
||||
tqDebug("vgId:%d no stream tasks existed to run", vgId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pMeta->startInfo.startAllTasks) {
|
||||
tqTrace("vgId:%d in restart procedure, not scan wal", vgId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
pMeta->scanInfo.scanCounter += 1;
|
||||
if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
|
||||
pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD;
|
||||
}
|
||||
|
||||
if (pMeta->scanInfo.scanCounter > 1) {
|
||||
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t numOfPauseTasks = pMeta->numOfPausedTasks;
|
||||
if (ckPause && numOfTasks == numOfPauseTasks) {
|
||||
tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId);
|
||||
|
||||
// reset the counter value, since we do not launch the scan wal operation.
|
||||
pMeta->scanInfo.scanCounter = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId,
|
||||
numOfTasks, alreadyRestored);
|
||||
|
||||
return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
||||
}
|
||||
|
|
|
@ -718,8 +718,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
|||
}
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
// drop the related fill-history task firstly
|
||||
if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
|
||||
tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
|
||||
|
@ -737,7 +735,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
|||
}
|
||||
|
||||
// commit the update
|
||||
streamMetaWLock(pMeta);
|
||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
|
||||
|
||||
|
|
|
@ -2950,17 +2950,16 @@ int32_t nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal) {
|
|||
case TSDB_DATA_TYPE_VARCHAR:
|
||||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
case TSDB_DATA_TYPE_GEOMETRY:
|
||||
pVal->pz = taosMemoryMalloc(pVal->nLen + 1);
|
||||
pVal->pz = taosMemoryCalloc(1, pVal->nLen + 1);
|
||||
if (pVal->pz) {
|
||||
memcpy(pVal->pz, pNode->datum.p, pVal->nLen);
|
||||
pVal->pz[pVal->nLen] = 0;
|
||||
memcpy(pVal->pz, pNode->datum.p, varDataTLen(pNode->datum.p));
|
||||
} else {
|
||||
code = terrno;
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
pVal->nLen = getJsonValueLen(pNode->datum.p);
|
||||
pVal->pz = taosMemoryMalloc(pVal->nLen);
|
||||
pVal->pz = taosMemoryCalloc(1, pVal->nLen);
|
||||
if (pVal->pz) {
|
||||
memcpy(pVal->pz, pNode->datum.p, pVal->nLen);
|
||||
} else {
|
||||
|
|
|
@ -1241,7 +1241,6 @@ EDealRes sclRewriteFunction(SNode **pNode, SScalarCtx *ctx) {
|
|||
ctx->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
res->node.resType.bytes = varDataTLen(output.columnData->pData);
|
||||
(void)memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
|
||||
} else {
|
||||
ctx->code = nodesSetValueNodeValue(res, output.columnData->pData);
|
||||
|
|
|
@ -1129,7 +1129,11 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
|
|||
int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
|
||||
// SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
|
||||
if (NULL == schMgmt.queryMgmt) {
|
||||
SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL));
|
||||
void* p = NULL;
|
||||
SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, &p, NULL));
|
||||
if (atomic_val_compare_exchange_ptr(&schMgmt.queryMgmt, NULL, p)) {
|
||||
qWorkerDestroy(&p);
|
||||
}
|
||||
}
|
||||
|
||||
SArray *explainRes = NULL;
|
||||
|
|
|
@ -37,7 +37,7 @@ extern "C" {
|
|||
#define META_HB_CHECK_INTERVAL 200
|
||||
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
|
||||
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
|
||||
#define STREAM_TASK_QUEUE_CAPACITY 20480
|
||||
#define STREAM_TASK_QUEUE_CAPACITY 5120
|
||||
#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30)
|
||||
|
||||
// clang-format off
|
||||
|
|
|
@ -591,7 +591,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
|||
|
||||
{ // destroy the related fill-history tasks
|
||||
// drop task should not in the meta-lock, and drop the related fill-history task now
|
||||
streamMetaWUnLock(pMeta);
|
||||
if (pReq->dropRelHTask) {
|
||||
code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
|
||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
|
@ -599,7 +598,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
|||
id, vgId, pReq->taskId, numOfTasks);
|
||||
}
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
if (pReq->dropRelHTask) {
|
||||
code = streamMetaCommit(pMeta);
|
||||
}
|
||||
|
@ -675,8 +673,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
// drop task should not in the meta-lock, and drop the related fill-history task now
|
||||
if (pReq->dropRelHTask) {
|
||||
code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
|
||||
|
@ -685,9 +681,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
|||
(int32_t)pReq->hTaskId, numOfTasks);
|
||||
}
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
code = streamMetaCommit(pMeta);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -522,7 +522,10 @@ static int32_t doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int
|
|||
if (pItem->type == STREAM_INPUT__GET_RES) {
|
||||
const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
|
||||
code = qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
|
||||
|
||||
if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
|
||||
stDebug("s-task:%s set force_window_close as source block, skey:%"PRId64, id, pTrigger->pBlock->info.window.skey);
|
||||
(*pVer) = pTrigger->pBlock->info.window.skey;
|
||||
}
|
||||
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
|
||||
code = qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
|
||||
|
@ -671,7 +674,7 @@ static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock
|
|||
|
||||
doRecordThroughput(&pTask->execInfo, totalBlocks, totalSize, blockSize, st, pTask->id.idStr);
|
||||
|
||||
// update the currentVer if processing the submit blocks.
|
||||
// update the currentVer if processing the submitted blocks.
|
||||
if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) {
|
||||
stError("s-task:%s invalid info, checkpointVer:%" PRId64 ", nextProcessVer:%" PRId64 " currentVer:%" PRId64, id,
|
||||
pInfo->checkpointVer, pInfo->nextProcessVer, ver);
|
||||
|
@ -688,6 +691,34 @@ static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock
|
|||
return code;
|
||||
}
|
||||
|
||||
// do nothing after sync executor state to storage backend, untill checkpoint is completed.
|
||||
static int32_t doHandleChkptBlock(SStreamTask* pTask) {
|
||||
int32_t code = 0;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
streamMutexLock(&pTask->lock);
|
||||
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
||||
if (pState.state == TASK_STATUS__CK) { // todo other thread may change the status
|
||||
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
|
||||
code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
|
||||
} else { // todo refactor
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
code = streamTaskSendCheckpointSourceRsp(pTask);
|
||||
} else {
|
||||
code = streamTaskSendCheckpointReadyMsg(pTask);
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo: let's retry send rsp to upstream/mnode
|
||||
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0,
|
||||
tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) {
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
|
@ -832,36 +863,16 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
|||
}
|
||||
}
|
||||
|
||||
if (type != STREAM_INPUT__CHECKPOINT) {
|
||||
if (type == STREAM_INPUT__CHECKPOINT) {
|
||||
code = doHandleChkptBlock(pTask);
|
||||
streamFreeQitem(pInput);
|
||||
return code;
|
||||
} else {
|
||||
code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
|
||||
streamFreeQitem(pInput);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
} else { // todo other thread may change the status
|
||||
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
|
||||
streamMutexLock(&pTask->lock);
|
||||
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
||||
if (pState.state == TASK_STATUS__CK) {
|
||||
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
|
||||
code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
|
||||
} else { // todo refactor
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
code = streamTaskSendCheckpointSourceRsp(pTask);
|
||||
} else {
|
||||
code = streamTaskSendCheckpointReadyMsg(pTask);
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo: let's retry send rsp to upstream/mnode
|
||||
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0,
|
||||
tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamFreeQitem(pInput);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -501,8 +501,6 @@ _err:
|
|||
void streamMetaInitBackend(SStreamMeta* pMeta) {
|
||||
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
|
||||
if (pMeta->streamBackend == NULL) {
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
while (1) {
|
||||
streamMetaWLock(pMeta);
|
||||
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
|
||||
|
@ -908,8 +906,6 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|||
int32_t code = 0;
|
||||
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
|
||||
if (code == 0) {
|
||||
// desc the paused task counter
|
||||
|
@ -958,10 +954,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
streamMetaWUnLock(pMeta);
|
||||
} else {
|
||||
stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -192,6 +192,7 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig
|
|||
const char* id = pTask->id.idStr;
|
||||
int8_t precision = pTask->info.interval.precision;
|
||||
SStreamTrigger* pTrigger = NULL;
|
||||
bool isFull = false;
|
||||
|
||||
while (1) {
|
||||
code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval,
|
||||
|
@ -214,7 +215,6 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig
|
|||
|
||||
// check whether the time window gaps exist or not
|
||||
int64_t now = taosGetTimestamp(precision);
|
||||
int64_t ekey = pTrigger->pBlock->info.window.skey + pTask->info.interval.interval;
|
||||
|
||||
// there are gaps, needs to be filled
|
||||
STimeWindow w = pTrigger->pBlock->info.window;
|
||||
|
@ -226,13 +226,18 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig
|
|||
}
|
||||
|
||||
pTask->status.latestForceWindow = w;
|
||||
if (ekey + pTask->info.watermark + pTask->info.interval.interval > now) {
|
||||
int64_t prev = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI);
|
||||
isFull = streamQueueIsFull(pTask->inputq.queue);
|
||||
|
||||
if ((w.ekey + pTask->info.watermark + pTask->info.interval.interval > now) || isFull) {
|
||||
int64_t prev = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI);
|
||||
if (!isFull) {
|
||||
*pNextTrigger = w.ekey + pTask->info.watermark + pTask->info.interval.interval - now;
|
||||
}
|
||||
|
||||
*pNextTrigger = ekey + pTask->info.watermark + pTask->info.interval.interval - now;
|
||||
*pNextTrigger = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI);
|
||||
stDebug("s-task:%s generate %d time window(s), trigger delay adjust from %" PRId64 " to %d", id, num, prev,
|
||||
*pNextTrigger);
|
||||
pTask->chkInfo.nextProcessVer = w.ekey + pTask->info.interval.interval;
|
||||
stDebug("s-task:%s generate %d time window(s), trigger delay adjust from %" PRId64 " to %d, set ver:%" PRId64, id,
|
||||
num, prev, *pNextTrigger, pTask->chkInfo.nextProcessVer);
|
||||
return code;
|
||||
} else {
|
||||
stDebug("s-task:%s gap exist for force_window_close, current force_window_skey:%" PRId64, id, w.skey);
|
||||
|
@ -289,7 +294,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
|
||||
nextTrigger = TRIGGER_RECHECK_INTERVAL; // retry in 10 seec
|
||||
nextTrigger = TRIGGER_RECHECK_INTERVAL; // retry in 10 sec
|
||||
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, TRIGGER_RECHECK_INTERVAL);
|
||||
} else {
|
||||
if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
|
|
|
@ -233,15 +233,19 @@ int32_t taosBlockSIGPIPE() {
|
|||
}
|
||||
|
||||
int32_t taosGetIpv4FromFqdn(const char *fqdn, uint32_t *ip) {
|
||||
int32_t code = 0;
|
||||
OS_PARAM_CHECK(fqdn);
|
||||
OS_PARAM_CHECK(ip);
|
||||
int64_t limitMs = 1000;
|
||||
int64_t st = taosGetTimestampMs(), cost = 0;
|
||||
#ifdef WINDOWS
|
||||
// Initialize Winsock
|
||||
WSADATA wsaData;
|
||||
int iResult;
|
||||
iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
|
||||
if (iResult != 0) {
|
||||
return TAOS_SYSTEM_WINSOCKET_ERROR(WSAGetLastError());
|
||||
code = TAOS_SYSTEM_WINSOCKET_ERROR(WSAGetLastError());
|
||||
goto _err;
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -260,12 +264,12 @@ int32_t taosGetIpv4FromFqdn(const char *fqdn, uint32_t *ip) {
|
|||
inRetry = true;
|
||||
continue;
|
||||
} else if (EAI_SYSTEM == ret) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return terrno;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return terrno;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
struct sockaddr *sa = result->ai_addr;
|
||||
|
@ -275,8 +279,7 @@ int32_t taosGetIpv4FromFqdn(const char *fqdn, uint32_t *ip) {
|
|||
*ip = ia.s_addr;
|
||||
|
||||
freeaddrinfo(result);
|
||||
|
||||
return 0;
|
||||
goto _err;
|
||||
}
|
||||
#else
|
||||
struct addrinfo hints = {0};
|
||||
|
@ -292,7 +295,7 @@ int32_t taosGetIpv4FromFqdn(const char *fqdn, uint32_t *ip) {
|
|||
struct in_addr ia = si->sin_addr;
|
||||
*ip = ia.s_addr;
|
||||
freeaddrinfo(result);
|
||||
return 0;
|
||||
goto _err;
|
||||
} else {
|
||||
#ifdef EAI_SYSTEM
|
||||
if (ret == EAI_SYSTEM) {
|
||||
|
@ -305,9 +308,16 @@ int32_t taosGetIpv4FromFqdn(const char *fqdn, uint32_t *ip) {
|
|||
#endif
|
||||
|
||||
*ip = 0xFFFFFFFF;
|
||||
return TSDB_CODE_RPC_FQDN_ERROR;
|
||||
code = TSDB_CODE_RPC_FQDN_ERROR;
|
||||
goto _err;
|
||||
}
|
||||
#endif
|
||||
_err:
|
||||
cost = taosGetTimestampMs() - st;
|
||||
if (cost >= limitMs) {
|
||||
uWarn("get ip from fqdn:%s, cost:%" PRId64 "ms", fqdn, cost);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t taosGetFqdn(char *fqdn) {
|
||||
|
|
|
@ -426,6 +426,15 @@ class TDTestCase:
|
|||
|
||||
tdLog.printNoPrefix("==========step4:after wal, all check again ")
|
||||
self.all_test()
|
||||
self.test_TD_33137()
|
||||
|
||||
def test_TD_33137(self):
|
||||
sql = "select 'asd' union all select 'asdasd'"
|
||||
tdSql.query(sql, queryTimes=1)
|
||||
tdSql.checkRows(2)
|
||||
sql = "select db_name `TABLE_CAT`, '' `TABLE_SCHEM`, stable_name `TABLE_NAME`, 'TABLE' `TABLE_TYPE`, table_comment `REMARKS` from information_schema.ins_stables union all select db_name `TABLE_CAT`, '' `TABLE_SCHEM`, table_name `TABLE_NAME`, case when `type`='SYSTEM_TABLE' then 'TABLE' when `type`='NORMAL_TABLE' then 'TABLE' when `type`='CHILD_TABLE' then 'TABLE' else 'UNKNOWN' end `TABLE_TYPE`, table_comment `REMARKS` from information_schema.ins_tables union all select db_name `TABLE_CAT`, '' `TABLE_SCHEM`, view_name `TABLE_NAME`, 'VIEW' `TABLE_TYPE`, NULL `REMARKS` from information_schema.ins_views"
|
||||
tdSql.query(sql, queryTimes=1)
|
||||
tdSql.checkRows(47)
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
|
Loading…
Reference in New Issue