Merge branch '3.0' into fix/TD-17009
This commit is contained in:
commit
104aa670df
|
@ -225,9 +225,9 @@ typedef struct SRequestObj {
|
|||
SArray* targetTableList;
|
||||
SQueryExecMetric metric;
|
||||
SRequestSendRecvBody body;
|
||||
bool syncQuery; // todo refactor: async query object
|
||||
bool stableQuery; // todo refactor
|
||||
bool validateOnly; // todo refactor
|
||||
bool syncQuery; // todo refactor: async query object
|
||||
bool stableQuery; // todo refactor
|
||||
bool validateOnly; // todo refactor
|
||||
bool killed;
|
||||
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
|
||||
uint32_t retry;
|
||||
|
|
|
@ -591,7 +591,7 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray
|
|||
return code;
|
||||
}
|
||||
|
||||
void freeVgList(void *list) {
|
||||
void freeVgList(void* list) {
|
||||
SArray* pList = *(SArray**)list;
|
||||
taosArrayDestroy(pList);
|
||||
}
|
||||
|
@ -1278,8 +1278,8 @@ int32_t doProcessMsgFromServer(void* param) {
|
|||
char tbuf[40] = {0};
|
||||
TRACE_TO_STR(trace, tbuf);
|
||||
|
||||
tscDebug("processMsgFromServer handle %p, message: %s, code: %s, gtid: %s", pMsg->info.handle, TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code),
|
||||
tbuf);
|
||||
tscDebug("processMsgFromServer handle %p, message: %s, code: %s, gtid: %s", pMsg->info.handle,
|
||||
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), tbuf);
|
||||
|
||||
if (pSendInfo->requestObjRefId != 0) {
|
||||
SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
|
||||
|
@ -2114,7 +2114,7 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes = execQuery(connId, sql, sqlLen, validateOnly);
|
||||
TAOS_RES* pRes = execQuery(*(int64_t*)taos, sql, sqlLen, validateOnly);
|
||||
return pRes;
|
||||
#endif
|
||||
}
|
||||
|
|
|
@ -374,11 +374,12 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
|
|||
p = taosArrayInsert(pMemTable->aTbData, idx, &pTbData);
|
||||
taosWUnLockLatch(&pMemTable->latch);
|
||||
|
||||
tsdbDebug("vgId:%d add table data %p at idx:%d", TD_VID(pMemTable->pTsdb->pVnode), pTbData, idx);
|
||||
|
||||
if (p == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
_exit:
|
||||
*ppTbData = pTbData;
|
||||
return code;
|
||||
|
|
|
@ -694,8 +694,8 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
|
|||
if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue;
|
||||
|
||||
_write_block:
|
||||
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdx, &pWriter->blockW,
|
||||
pWriter->cmprAlg);
|
||||
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
|
||||
&pWriter->blockW, pWriter->cmprAlg);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
|
||||
|
@ -756,7 +756,7 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) {
|
|||
if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
|
||||
ASSERT(pWriter->pDataFReader);
|
||||
|
||||
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlock);
|
||||
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
|
||||
int32_t c = tTABLEIDCmprFn(pBlockIdx, &id);
|
||||
|
||||
ASSERT(c >= 0);
|
||||
|
@ -833,7 +833,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
|
|||
}
|
||||
|
||||
_exit:
|
||||
tsdbError("vgId:%d vnode snapshot tsdb writer data end", TD_VID(pTsdb->pVnode));
|
||||
tsdbInfo("vgId:%d vnode snapshot tsdb writer data end", TD_VID(pTsdb->pVnode));
|
||||
return code;
|
||||
|
||||
_err:
|
||||
|
|
|
@ -288,7 +288,7 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
// apply the commit (TODO)
|
||||
walEndSnapshot(pVnode->pWal);
|
||||
|
||||
vInfo("vgId:%d, commit over", TD_VID(pVnode));
|
||||
vInfo("vgId:%d, commit end", TD_VID(pVnode));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -180,6 +180,7 @@ struct SVSnapWriter {
|
|||
SVnode *pVnode;
|
||||
int64_t sver;
|
||||
int64_t ever;
|
||||
int64_t commitID;
|
||||
int64_t index;
|
||||
// meta
|
||||
SMetaSnapWriter *pMetaSnapWriter;
|
||||
|
@ -201,7 +202,16 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
|
|||
pWriter->sver = sver;
|
||||
pWriter->ever = ever;
|
||||
|
||||
vInfo("vgId:%d vnode snapshot writer opened", TD_VID(pVnode));
|
||||
// commit it
|
||||
code = vnodeCommit(pVnode);
|
||||
if (code) goto _err;
|
||||
|
||||
// inc commit ID
|
||||
pVnode->state.commitID++;
|
||||
pWriter->commitID = pVnode->state.commitID;
|
||||
|
||||
vInfo("vgId:%d vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode),
|
||||
sver, ever, pWriter->commitID);
|
||||
*ppWriter = pWriter;
|
||||
return code;
|
||||
|
||||
|
@ -244,6 +254,8 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
|||
|
||||
code = vnodeCommitInfo(dir, &info);
|
||||
if (code) goto _err;
|
||||
|
||||
vnodeBegin(pVnode);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
|
|
@ -251,15 +251,29 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
}
|
||||
}
|
||||
|
||||
bool assignUid = false;
|
||||
|
||||
if (LIST_LENGTH(pScanInfo->pGroupTags) > 0) {
|
||||
SNode* p = nodesListGetNode(pScanInfo->pGroupTags, 0);
|
||||
if (p->type == QUERY_NODE_FUNCTION) {
|
||||
// partition by tbname/group by tbname
|
||||
assignUid = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) {
|
||||
uint64_t* uid = taosArrayGet(qa, i);
|
||||
STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
|
||||
|
||||
if (bufLen > 0) {
|
||||
code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
|
||||
&keyInfo.groupId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
if (assignUid) {
|
||||
keyInfo.groupId = keyInfo.uid;
|
||||
} else {
|
||||
code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
|
||||
&keyInfo.groupId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -513,7 +513,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3
|
|||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
msgSendInfo->paramFreeFp = taosMemoryFree;
|
||||
msgSendInfo->paramFreeFp = taosMemoryFree;
|
||||
SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, msgType, isHb, trans, &msgSendInfo->param));
|
||||
|
||||
SCH_ERR_JRET(schGetCallbackFp(msgType, &msgSendInfo->fp));
|
||||
|
@ -541,7 +541,7 @@ _return:
|
|||
}
|
||||
|
||||
taosMemoryFree(msg);
|
||||
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
|
@ -681,7 +681,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
|
|||
param->pTrans = pJob->conn.pTrans;
|
||||
|
||||
pMsgSendInfo->param = param;
|
||||
pMsgSendInfo->paramFreeFp = taosMemoryFree;
|
||||
pMsgSendInfo->paramFreeFp = taosMemoryFree;
|
||||
pMsgSendInfo->fp = fp;
|
||||
|
||||
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo};
|
||||
|
@ -801,7 +801,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) {
|
|||
pDst->param = NULL;
|
||||
|
||||
SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
|
||||
pDst->paramFreeFp = taosMemoryFree;
|
||||
pDst->paramFreeFp = taosMemoryFree;
|
||||
|
||||
*dst = pDst;
|
||||
|
||||
|
@ -1094,14 +1094,29 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
break;
|
||||
}
|
||||
|
||||
#if 1
|
||||
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
|
||||
schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
|
||||
msg = NULL;
|
||||
SCH_ERR_JRET(code);
|
||||
|
||||
|
||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
|
||||
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
|
||||
}
|
||||
#else
|
||||
if (TDMT_VND_SUBMIT != msgType) {
|
||||
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
|
||||
schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
|
||||
msg = NULL;
|
||||
SCH_ERR_JRET(code);
|
||||
|
||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
|
||||
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
|
||||
}
|
||||
} else {
|
||||
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
|
||||
}
|
||||
#endif
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
|
|
|
@ -489,7 +489,7 @@ class TDDnode:
|
|||
onlyKillOnceWindows = 0
|
||||
while(processID):
|
||||
if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'):
|
||||
killCmd = "kill -4 %s > /dev/null 2>&1" % processID
|
||||
killCmd = "kill -INT %s > /dev/null 2>&1" % processID
|
||||
os.system(killCmd)
|
||||
onlyKillOnceWindows = 1
|
||||
time.sleep(1)
|
||||
|
@ -503,7 +503,7 @@ class TDDnode:
|
|||
time.sleep(2)
|
||||
|
||||
self.running = 0
|
||||
tdLog.debug("dnode:%d is stopped by kill -4" % (self.index))
|
||||
tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index))
|
||||
|
||||
|
||||
def stoptaosd(self):
|
||||
|
@ -527,7 +527,7 @@ class TDDnode:
|
|||
onlyKillOnceWindows = 0
|
||||
while(processID):
|
||||
if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'):
|
||||
killCmd = "kill -4 %s > /dev/null 2>&1" % processID
|
||||
killCmd = "kill -INT %s > /dev/null 2>&1" % processID
|
||||
os.system(killCmd)
|
||||
onlyKillOnceWindows = 1
|
||||
time.sleep(1)
|
||||
|
@ -537,7 +537,7 @@ class TDDnode:
|
|||
time.sleep(2)
|
||||
|
||||
self.running = 0
|
||||
tdLog.debug("dnode:%d is stopped by kill -4" % (self.index))
|
||||
tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index))
|
||||
|
||||
def forcestop(self):
|
||||
if (not self.remoteIP == ""):
|
||||
|
|
|
@ -0,0 +1,172 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/deploy.sh -n dnode2 -i 2
|
||||
system sh/deploy.sh -n dnode3 -i 3
|
||||
system sh/deploy.sh -n dnode4 -i 4
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
|
||||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
system sh/exec.sh -n dnode4 -s start
|
||||
|
||||
sql connect
|
||||
sql create dnode $hostname port 7200
|
||||
sql create dnode $hostname port 7300
|
||||
sql create dnode $hostname port 7400
|
||||
|
||||
$x = 0
|
||||
step1:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
||||
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||
print ===> $data20 $data21 $data22 $data23 $data24 $data25
|
||||
print ===> $data30 $data31 $data32 $data33 $data34 $data35
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data(1)[4] != ready then
|
||||
goto step1
|
||||
endi
|
||||
if $data(2)[4] != ready then
|
||||
goto step1
|
||||
endi
|
||||
if $data(3)[4] != ready then
|
||||
goto step1
|
||||
endi
|
||||
if $data(4)[4] != ready then
|
||||
goto step1
|
||||
endi
|
||||
|
||||
$replica = 3
|
||||
$vgroups = 1
|
||||
$retentions = 5s:7d,15s:21d
|
||||
|
||||
print ============= create database
|
||||
sql create database db replica $replica vgroups $vgroups retentions $retentions
|
||||
|
||||
$loop_cnt = 0
|
||||
check_db_ready:
|
||||
$loop_cnt = $loop_cnt + 1
|
||||
sleep 200
|
||||
if $loop_cnt == 100 then
|
||||
print ====> db not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show databases
|
||||
print ===> rows: $rows
|
||||
print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][6] $data[2][11] $data[2][12] $data[2][13] $data[2][14] $data[2][15] $data[2][16] $data[2][17] $data[2][18] $data[2][19]
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data[2][15] != ready then
|
||||
goto check_db_ready
|
||||
endi
|
||||
|
||||
sql use db
|
||||
|
||||
$loop_cnt = 0
|
||||
check_vg_ready:
|
||||
$loop_cnt = $loop_cnt + 1
|
||||
sleep 200
|
||||
if $loop_cnt == 300 then
|
||||
print ====> vgroups not ready!
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show vgroups
|
||||
print ===> rows: $rows
|
||||
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[0][10] $data[0][11]
|
||||
|
||||
if $rows != $vgroups then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data[0][4] == leader then
|
||||
if $data[0][6] == follower then
|
||||
if $data[0][8] == follower then
|
||||
print ---- vgroup $data[0][0] leader locate on dnode $data[0][3]
|
||||
endi
|
||||
endi
|
||||
elif $data[0][6] == leader then
|
||||
if $data[0][4] == follower then
|
||||
if $data[0][8] == follower then
|
||||
print ---- vgroup $data[0][0] leader locate on dnode $data[0][5]
|
||||
endi
|
||||
endi
|
||||
elif $data[0][8] == leader then
|
||||
if $data[0][4] == follower then
|
||||
if $data[0][6] == follower then
|
||||
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
|
||||
endi
|
||||
endi
|
||||
else
|
||||
goto check_vg_ready
|
||||
endi
|
||||
|
||||
|
||||
vg_ready:
|
||||
print ====> create stable/child table
|
||||
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) rollup(sum)
|
||||
|
||||
sql show stables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql create table ct1 using stb tags(1000)
|
||||
|
||||
|
||||
print ===> stop dnode4
|
||||
system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
||||
sleep 3000
|
||||
|
||||
|
||||
print ===> write 100 records
|
||||
$N = 100
|
||||
$count = 0
|
||||
while $count < $N
|
||||
$ms = 1659000000000 + $count
|
||||
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
|
||||
$count = $count + 1
|
||||
endw
|
||||
|
||||
|
||||
#sql flush database db;
|
||||
|
||||
|
||||
sleep 3000
|
||||
|
||||
|
||||
print ===> stop dnode1 dnode2 dnode3
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
||||
|
||||
sleep 10000
|
||||
|
||||
########################################################
|
||||
print ===> start dnode1 dnode2 dnode3 dnode4
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
system sh/exec.sh -n dnode4 -s start
|
||||
|
||||
sleep 3000
|
||||
|
||||
print =============== query data
|
||||
sql connect
|
||||
sql use db
|
||||
sql select * from ct1
|
||||
print rows: $rows
|
||||
print $data00 $data01 $data02
|
||||
if $rows != 100 then
|
||||
return -1
|
||||
endi
|
|
@ -175,7 +175,7 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M
|
|||
# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3
|
||||
|
||||
# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3
|
||||
|
||||
|
|
Loading…
Reference in New Issue