diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index b4c0fd380e..2831e7a904 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -225,9 +225,9 @@ typedef struct SRequestObj { SArray* targetTableList; SQueryExecMetric metric; SRequestSendRecvBody body; - bool syncQuery; // todo refactor: async query object - bool stableQuery; // todo refactor - bool validateOnly; // todo refactor + bool syncQuery; // todo refactor: async query object + bool stableQuery; // todo refactor + bool validateOnly; // todo refactor bool killed; uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t retry; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 68f47ba915..6f8e7d0237 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -591,7 +591,7 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray return code; } -void freeVgList(void *list) { +void freeVgList(void* list) { SArray* pList = *(SArray**)list; taosArrayDestroy(pList); } @@ -1278,8 +1278,8 @@ int32_t doProcessMsgFromServer(void* param) { char tbuf[40] = {0}; TRACE_TO_STR(trace, tbuf); - tscDebug("processMsgFromServer handle %p, message: %s, code: %s, gtid: %s", pMsg->info.handle, TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), - tbuf); + tscDebug("processMsgFromServer handle %p, message: %s, code: %s, gtid: %s", pMsg->info.handle, + TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), tbuf); if (pSendInfo->requestObjRefId != 0) { SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); @@ -2114,7 +2114,7 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { return NULL; } - TAOS_RES* pRes = execQuery(connId, sql, sqlLen, validateOnly); + TAOS_RES* pRes = execQuery(*(int64_t*)taos, sql, sqlLen, validateOnly); return pRes; #endif } diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index fa775bb882..50d7de3e11 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -374,11 +374,12 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid p = taosArrayInsert(pMemTable->aTbData, idx, &pTbData); taosWUnLockLatch(&pMemTable->latch); + tsdbDebug("vgId:%d add table data %p at idx:%d", TD_VID(pMemTable->pTsdb->pVnode), pTbData, idx); + if (p == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - _exit: *ppTbData = pTbData; return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 43537c9a8d..51d7edcf71 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -694,8 +694,8 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue; _write_block: - code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdx, &pWriter->blockW, - pWriter->cmprAlg); + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, + &pWriter->blockW, pWriter->cmprAlg); if (code) goto _err; code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); @@ -756,7 +756,7 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) { if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { ASSERT(pWriter->pDataFReader); - SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlock); + SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); int32_t c = tTABLEIDCmprFn(pBlockIdx, &id); ASSERT(c >= 0); @@ -833,7 +833,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { } _exit: - tsdbError("vgId:%d vnode snapshot tsdb writer data end", TD_VID(pTsdb->pVnode)); + tsdbInfo("vgId:%d vnode snapshot tsdb writer data end", TD_VID(pTsdb->pVnode)); return code; _err: diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 1207df8058..e8d029a88f 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -288,7 +288,7 @@ int vnodeCommit(SVnode *pVnode) { // apply the commit (TODO) walEndSnapshot(pVnode->pWal); - vInfo("vgId:%d, commit over", TD_VID(pVnode)); + vInfo("vgId:%d, commit end", TD_VID(pVnode)); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 7241ca4971..3f8f81cb09 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -180,6 +180,7 @@ struct SVSnapWriter { SVnode *pVnode; int64_t sver; int64_t ever; + int64_t commitID; int64_t index; // meta SMetaSnapWriter *pMetaSnapWriter; @@ -201,7 +202,16 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr pWriter->sver = sver; pWriter->ever = ever; - vInfo("vgId:%d vnode snapshot writer opened", TD_VID(pVnode)); + // commit it + code = vnodeCommit(pVnode); + if (code) goto _err; + + // inc commit ID + pVnode->state.commitID++; + pWriter->commitID = pVnode->state.commitID; + + vInfo("vgId:%d vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode), + sver, ever, pWriter->commitID); *ppWriter = pWriter; return code; @@ -244,6 +254,8 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * code = vnodeCommitInfo(dir, &info); if (code) goto _err; + + vnodeBegin(pVnode); } else { ASSERT(0); } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 7f88e628c1..171f39db1b 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -251,15 +251,29 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } } + bool assignUid = false; + + if (LIST_LENGTH(pScanInfo->pGroupTags) > 0) { + SNode* p = nodesListGetNode(pScanInfo->pGroupTags, 0); + if (p->type == QUERY_NODE_FUNCTION) { + // partition by tbname/group by tbname + assignUid = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0); + } + } + for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) { uint64_t* uid = taosArrayGet(qa, i); STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0}; if (bufLen > 0) { - code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf, - &keyInfo.groupId); - if (code != TSDB_CODE_SUCCESS) { - return code; + if (assignUid) { + keyInfo.groupId = keyInfo.uid; + } else { + code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf, + &keyInfo.groupId); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 67050241e3..3bec91226c 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -513,7 +513,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3 SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - msgSendInfo->paramFreeFp = taosMemoryFree; + msgSendInfo->paramFreeFp = taosMemoryFree; SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, msgType, isHb, trans, &msgSendInfo->param)); SCH_ERR_JRET(schGetCallbackFp(msgType, &msgSendInfo->fp)); @@ -541,7 +541,7 @@ _return: } taosMemoryFree(msg); - + SCH_RET(code); } @@ -681,7 +681,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { param->pTrans = pJob->conn.pTrans; pMsgSendInfo->param = param; - pMsgSendInfo->paramFreeFp = taosMemoryFree; + pMsgSendInfo->paramFreeFp = taosMemoryFree; pMsgSendInfo->fp = fp; SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo}; @@ -801,7 +801,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) { pDst->param = NULL; SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param)); - pDst->paramFreeFp = taosMemoryFree; + pDst->paramFreeFp = taosMemoryFree; *dst = pDst; @@ -1094,14 +1094,29 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, break; } +#if 1 SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); msg = NULL; SCH_ERR_JRET(code); - + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) { SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId)); } +#else + if (TDMT_VND_SUBMIT != msgType) { + SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; + schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); + msg = NULL; + SCH_ERR_JRET(code); + + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) { + SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId)); + } + } else { + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); + } +#endif return TSDB_CODE_SUCCESS; diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 46af61d474..20e4e4abe6 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -489,7 +489,7 @@ class TDDnode: onlyKillOnceWindows = 0 while(processID): if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'): - killCmd = "kill -4 %s > /dev/null 2>&1" % processID + killCmd = "kill -INT %s > /dev/null 2>&1" % processID os.system(killCmd) onlyKillOnceWindows = 1 time.sleep(1) @@ -503,7 +503,7 @@ class TDDnode: time.sleep(2) self.running = 0 - tdLog.debug("dnode:%d is stopped by kill -4" % (self.index)) + tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index)) def stoptaosd(self): @@ -527,7 +527,7 @@ class TDDnode: onlyKillOnceWindows = 0 while(processID): if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'): - killCmd = "kill -4 %s > /dev/null 2>&1" % processID + killCmd = "kill -INT %s > /dev/null 2>&1" % processID os.system(killCmd) onlyKillOnceWindows = 1 time.sleep(1) @@ -537,7 +537,7 @@ class TDDnode: time.sleep(2) self.running = 0 - tdLog.debug("dnode:%d is stopped by kill -4" % (self.index)) + tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index)) def forcestop(self): if (not self.remoteIP == ""): diff --git a/tests/script/tsim/sync/vnodesnapshot-rsma-test.sim b/tests/script/tsim/sync/vnodesnapshot-rsma-test.sim new file mode 100644 index 0000000000..ec03aaf9db --- /dev/null +++ b/tests/script/tsim/sync/vnodesnapshot-rsma-test.sim @@ -0,0 +1,172 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 +system sh/deploy.sh -n dnode4 -i 4 + +system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 + +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode4 -s start + +sql connect +sql create dnode $hostname port 7200 +sql create dnode $hostname port 7300 +sql create dnode $hostname port 7400 + +$x = 0 +step1: + $x = $x + 1 + sleep 1000 + if $x == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +print ===> $data20 $data21 $data22 $data23 $data24 $data25 +print ===> $data30 $data31 $data32 $data33 $data34 $data35 +if $rows != 4 then + return -1 +endi +if $data(1)[4] != ready then + goto step1 +endi +if $data(2)[4] != ready then + goto step1 +endi +if $data(3)[4] != ready then + goto step1 +endi +if $data(4)[4] != ready then + goto step1 +endi + +$replica = 3 +$vgroups = 1 +$retentions = 5s:7d,15s:21d + +print ============= create database +sql create database db replica $replica vgroups $vgroups retentions $retentions + +$loop_cnt = 0 +check_db_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 100 then + print ====> db not ready! + return -1 +endi +sql show databases +print ===> rows: $rows +print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][6] $data[2][11] $data[2][12] $data[2][13] $data[2][14] $data[2][15] $data[2][16] $data[2][17] $data[2][18] $data[2][19] +if $rows != 3 then + return -1 +endi +if $data[2][15] != ready then + goto check_db_ready +endi + +sql use db + +$loop_cnt = 0 +check_vg_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 300 then + print ====> vgroups not ready! + return -1 +endi + +sql show vgroups +print ===> rows: $rows +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[0][10] $data[0][11] + +if $rows != $vgroups then + return -1 +endi + +if $data[0][4] == leader then + if $data[0][6] == follower then + if $data[0][8] == follower then + print ---- vgroup $data[0][0] leader locate on dnode $data[0][3] + endi + endi +elif $data[0][6] == leader then + if $data[0][4] == follower then + if $data[0][8] == follower then + print ---- vgroup $data[0][0] leader locate on dnode $data[0][5] + endi + endi +elif $data[0][8] == leader then + if $data[0][4] == follower then + if $data[0][6] == follower then + print ---- vgroup $data[0][0] leader locate on dnode $data[0][7] + endi + endi +else + goto check_vg_ready +endi + + +vg_ready: +print ====> create stable/child table +sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) rollup(sum) + +sql show stables +if $rows != 1 then + return -1 +endi + +sql create table ct1 using stb tags(1000) + + +print ===> stop dnode4 +system sh/exec.sh -n dnode4 -s stop -x SIGINT +sleep 3000 + + +print ===> write 100 records +$N = 100 +$count = 0 +while $count < $N + $ms = 1659000000000 + $count + sql insert into ct1 values( $ms , $count , 2.1, 3.1) + $count = $count + 1 +endw + + +#sql flush database db; + + +sleep 3000 + + +print ===> stop dnode1 dnode2 dnode3 +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT + +sleep 10000 + +######################################################## +print ===> start dnode1 dnode2 dnode3 dnode4 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode4 -s start + +sleep 3000 + +print =============== query data +sql connect +sql use db +sql select * from ct1 +print rows: $rows +print $data00 $data01 $data02 +if $rows != 100 then + return -1 +endi \ No newline at end of file diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 7a0447cd6c..458951b815 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -175,7 +175,7 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M # python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3 -# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3