Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-30837

This commit is contained in:
54liuyao 2024-10-17 17:12:00 +08:00
commit e825d74213
45 changed files with 603 additions and 200 deletions

View File

@ -169,11 +169,48 @@ ELSE ()
SET(COMPILER_SUPPORT_AVX512VL false)
ELSE()
CHECK_C_COMPILER_FLAG("-mfma" COMPILER_SUPPORT_FMA)
CHECK_C_COMPILER_FLAG("-mavx" COMPILER_SUPPORT_AVX)
CHECK_C_COMPILER_FLAG("-mavx2" COMPILER_SUPPORT_AVX2)
CHECK_C_COMPILER_FLAG("-mavx512f" COMPILER_SUPPORT_AVX512F)
CHECK_C_COMPILER_FLAG("-mavx512vbmi" COMPILER_SUPPORT_AVX512BMI)
CHECK_C_COMPILER_FLAG("-mavx512vl" COMPILER_SUPPORT_AVX512VL)
INCLUDE(CheckCSourceRuns)
SET(CMAKE_REQUIRED_FLAGS "-mavx")
check_c_source_runs("
#include <immintrin.h>
int main() {
__m256d a, b, c;
double buf[4] = {0};
a = _mm256_loadu_pd(buf);
b = _mm256_loadu_pd(buf);
c = _mm256_add_pd(a, b);
_mm256_storeu_pd(buf, c);
for (int i = 0; i < sizeof(buf) / sizeof(buf[0]); ++i) {
if (buf[i] != 0) {
return 1;
}
}
return 0;
}
" COMPILER_SUPPORT_AVX)
SET(CMAKE_REQUIRED_FLAGS "-mavx2")
check_c_source_runs("
#include <immintrin.h>
int main() {
__m256i a, b, c;
int buf[8] = {0};
a = _mm256_loadu_si256((__m256i *)buf);
b = _mm256_loadu_si256((__m256i *)buf);
c = _mm256_and_si256(a, b);
_mm256_storeu_si256((__m256i *)buf, c);
for (int i = 0; i < sizeof(buf) / sizeof(buf[0]); ++i) {
if (buf[i] != 0) {
return 1;
}
}
return 0;
}
" COMPILER_SUPPORT_AVX2)
ENDIF()
IF (COMPILER_SUPPORT_SSE42)

View File

@ -10,7 +10,7 @@ description: 3.3.3.0 版本说明
4. TDengine支持macOS企业版客户端 [企业版]
5. taosX日志默认不写入syslog [企业版]
6. 服务端记录所有慢查询信息到log库
7. show cluster machines 查询结果中添加服务端版本号
7. show cluster machines 查询结果中添加服务端版本号 [企业版]
8. 删除保留关键字LEVEL/ENCODE/COMPRESS, 可以作为列名/表名/数据库名等使用
9. 禁止动态修改临时目录
10. round 函数:支持四舍五入的精度

View File

@ -757,7 +757,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaAcquireOneTask(SStreamTask* pTask);
int32_t streamMetaAcquireOneTask(SStreamTask* pTask);
void streamMetaClear(SStreamMeta* pMeta);
void streamMetaInitBackend(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);

View File

@ -50,6 +50,7 @@ typedef struct {
int32_t rollPeriod; // secs
int64_t retentionSize;
int64_t segSize;
int64_t committed;
EWalType level; // wal level
int32_t encryptAlgorithm;
char encryptKey[ENCRYPT_KEY_LEN + 1];

View File

@ -84,7 +84,7 @@ void taos_cleanup(void) {
taosCloseRef(id);
nodesDestroyAllocatorSet();
cleanupAppInfo();
// cleanupAppInfo();
rpcCleanup();
tscDebug("rpc cleanup");

View File

@ -2625,6 +2625,8 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf
uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code));
lino = __LINE__;
goto _exit;
} else { // reset the length value
code = TSDB_CODE_SUCCESS;
}
len += tsnprintf(dumpBuf + len, size - len, " %15s |", pBuf);
if (len >= size - 1) goto _exit;

View File

@ -182,6 +182,7 @@ static void dmSetSignalHandle() {
}
#endif
}
extern bool generateNewMeta;
static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
global.startTime = taosGetTimestampMs();
@ -221,6 +222,8 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
global.dumpSdb = true;
} else if (strcmp(argv[i], "-dTxn") == 0) {
global.deleteTrans = true;
} else if (strcmp(argv[i], "-r") == 0) {
generateNewMeta = true;
} else if (strcmp(argv[i], "-E") == 0) {
if (i < argc - 1) {
if (strlen(argv[++i]) >= PATH_MAX) {

View File

@ -123,6 +123,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
int32_t code = 0;
SStatusReq req = {0};
dDebug("send status req to mnode, statusSeq:%d, begin to mgnt lock", pMgmt->statusSeq);
(void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
req.sver = tsVersion;
req.dnodeVer = pMgmt->pData->dnodeVer;
@ -161,14 +162,17 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
(void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
dDebug("send status req to mnode, statusSeq:%d, begin to get vnode loads", pMgmt->statusSeq);
SMonVloadInfo vinfo = {0};
(*pMgmt->getVnodeLoadsFp)(&vinfo);
req.pVloads = vinfo.pVloads;
dDebug("send status req to mnode, statusSeq:%d, begin to get mnode loads", pMgmt->statusSeq);
SMonMloadInfo minfo = {0};
(*pMgmt->getMnodeLoadsFp)(&minfo);
req.mload = minfo.load;
dDebug("send status req to mnode, statusSeq:%d, begin to get qnode loads", pMgmt->statusSeq);
(*pMgmt->getQnodeLoadsFp)(&req.qload);
pMgmt->statusSeq++;
@ -206,6 +210,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
int8_t epUpdated = 0;
(void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
dDebug("send status req to mnode, statusSeq:%d, begin to send rpc msg", pMgmt->statusSeq);
code =
rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
if (code != 0) {

View File

@ -515,6 +515,7 @@ static int32_t mndInitWal(SMnode *pMnode) {
.fsyncPeriod = 0,
.rollPeriod = -1,
.segSize = -1,
.committed = -1,
.retentionPeriod = 0,
.retentionSize = 0,
.level = TAOS_WAL_FSYNC,

View File

@ -1782,7 +1782,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
int32_t code = 0;
int32_t code = 0;
if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
return code;
@ -1810,6 +1810,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return 0;
}
mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
sdbRelease(pMnode->pSdb, pStream);
return -1;

View File

@ -61,7 +61,6 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
@ -93,7 +92,6 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
if (pReq == NULL) {
mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
// terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
@ -106,19 +104,18 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {
terrno = code;
taosMemoryFree(pReq);
return terrno;
return code;
}
code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(pReq);
return terrno;
return code;
}
mDebug("set the resume action for trans:%d", pTrans->id);
return 0;
return code;
}
static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) {

View File

@ -81,6 +81,9 @@ typedef struct SCommitInfo SCommitInfo;
typedef struct SCompactInfo SCompactInfo;
typedef struct SQueryNode SQueryNode;
#define VNODE_META_TMP_DIR "meta.tmp"
#define VNODE_META_BACKUP_DIR "meta.backup"
#define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb"
#define VNODE_TQ_DIR "tq"

View File

@ -133,7 +133,7 @@ static void doScan(SMeta *pMeta) {
}
}
int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
static int32_t metaOpenImpl(SVnode *pVnode, SMeta **ppMeta, const char *metaDir, int8_t rollback) {
SMeta *pMeta = NULL;
int32_t code = 0;
int32_t lino;
@ -144,7 +144,11 @@ int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
// create handle
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
offset = strlen(path);
snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VNODE_META_DIR);
snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, metaDir);
if (strncmp(metaDir, VNODE_META_TMP_DIR, strlen(VNODE_META_TMP_DIR)) == 0) {
taosRemoveDir(path);
}
if ((pMeta = taosMemoryCalloc(1, sizeof(*pMeta) + strlen(path) + 1)) == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _exit);
@ -245,6 +249,188 @@ _exit:
return code;
}
bool generateNewMeta = false;
static int32_t metaGenerateNewMeta(SMeta **ppMeta) {
SMeta *pNewMeta = NULL;
SMeta *pMeta = *ppMeta;
SVnode *pVnode = pMeta->pVnode;
metaInfo("vgId:%d start to generate new meta", TD_VID(pMeta->pVnode));
// Open a new meta for orgainzation
int32_t code = metaOpenImpl(pMeta->pVnode, &pNewMeta, VNODE_META_TMP_DIR, false);
if (code) {
return code;
}
code = metaBegin(pNewMeta, META_BEGIN_HEAP_NIL);
if (code) {
return code;
}
// i == 0, scan super table
// i == 1, scan normal table and child table
for (int i = 0; i < 2; i++) {
TBC *uidCursor = NULL;
int32_t counter = 0;
code = tdbTbcOpen(pMeta->pUidIdx, &uidCursor, NULL);
if (code) {
metaError("vgId:%d failed to open uid index cursor, reason:%s", TD_VID(pVnode), tstrerror(code));
return code;
}
code = tdbTbcMoveToFirst(uidCursor);
if (code) {
metaError("vgId:%d failed to move to first, reason:%s", TD_VID(pVnode), tstrerror(code));
tdbTbcClose(uidCursor);
return code;
}
for (;;) {
const void *pKey;
int kLen;
const void *pVal;
int vLen;
if (tdbTbcGet(uidCursor, &pKey, &kLen, &pVal, &vLen) < 0) {
break;
}
tb_uid_t uid = *(tb_uid_t *)pKey;
SUidIdxVal *pUidIdxVal = (SUidIdxVal *)pVal;
if ((i == 0 && (pUidIdxVal->suid && pUidIdxVal->suid == uid)) // super table
|| (i == 1 && (pUidIdxVal->suid == 0 || pUidIdxVal->suid != uid)) // normal table and child table
) {
counter++;
if (i == 0) {
metaInfo("vgId:%d counter:%d new meta handle %s table uid:%" PRId64, TD_VID(pVnode), counter, "super", uid);
} else {
metaInfo("vgId:%d counter:%d new meta handle %s table uid:%" PRId64, TD_VID(pVnode), counter,
pUidIdxVal->suid == 0 ? "normal" : "child", uid);
}
// fetch table entry
void *value = NULL;
int valueSize = 0;
if (tdbTbGet(pMeta->pTbDb,
&(STbDbKey){
.version = pUidIdxVal->version,
.uid = uid,
},
sizeof(uid), &value, &valueSize) == 0) {
SDecoder dc = {0};
SMetaEntry me = {0};
tDecoderInit(&dc, value, valueSize);
if (metaDecodeEntry(&dc, &me) == 0) {
if (metaHandleEntry(pNewMeta, &me) != 0) {
metaError("vgId:%d failed to handle entry, uid:%" PRId64, TD_VID(pVnode), uid);
}
}
tDecoderClear(&dc);
}
tdbFree(value);
}
code = tdbTbcMoveToNext(uidCursor);
if (code) {
metaError("vgId:%d failed to move to next, reason:%s", TD_VID(pVnode), tstrerror(code));
return code;
}
}
tdbTbcClose(uidCursor);
}
code = metaCommit(pNewMeta, pNewMeta->txn);
if (code) {
metaError("vgId:%d failed to commit, reason:%s", TD_VID(pVnode), tstrerror(code));
return code;
}
code = metaFinishCommit(pNewMeta, pNewMeta->txn);
if (code) {
metaError("vgId:%d failed to finish commit, reason:%s", TD_VID(pVnode), tstrerror(code));
return code;
}
if ((code = metaBegin(pNewMeta, META_BEGIN_HEAP_NIL)) != 0) {
metaError("vgId:%d failed to begin new meta, reason:%s", TD_VID(pVnode), tstrerror(code));
}
metaClose(&pNewMeta);
metaInfo("vgId:%d finish to generate new meta", TD_VID(pVnode));
return 0;
}
int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
if (generateNewMeta) {
char path[TSDB_FILENAME_LEN] = {0};
char oldMetaPath[TSDB_FILENAME_LEN] = {0};
char newMetaPath[TSDB_FILENAME_LEN] = {0};
char backupMetaPath[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
snprintf(oldMetaPath, sizeof(oldMetaPath) - 1, "%s%s%s", path, TD_DIRSEP, VNODE_META_DIR);
snprintf(newMetaPath, sizeof(newMetaPath) - 1, "%s%s%s", path, TD_DIRSEP, VNODE_META_TMP_DIR);
snprintf(backupMetaPath, sizeof(backupMetaPath) - 1, "%s%s%s", path, TD_DIRSEP, VNODE_META_BACKUP_DIR);
bool oldMetaExist = taosCheckExistFile(oldMetaPath);
bool newMetaExist = taosCheckExistFile(newMetaPath);
bool backupMetaExist = taosCheckExistFile(backupMetaPath);
if ((!backupMetaExist && !oldMetaExist && newMetaExist) // case 2
|| (backupMetaExist && !oldMetaExist && !newMetaExist) // case 4
|| (backupMetaExist && oldMetaExist && newMetaExist) // case 8
) {
metaError("vgId:%d invalid meta state, please check", TD_VID(pVnode));
return TSDB_CODE_FAILED;
} else if ((backupMetaExist && oldMetaExist && !newMetaExist) // case 7
|| (!backupMetaExist && !oldMetaExist && !newMetaExist) // case 1
) {
return metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, rollback);
} else if (backupMetaExist && !oldMetaExist && newMetaExist) {
if (taosRenameFile(newMetaPath, oldMetaPath) != 0) {
metaError("vgId:%d failed to rename new meta to old meta, reason:%s", TD_VID(pVnode), tstrerror(terrno));
return terrno;
}
return metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, rollback);
} else {
int32_t code = metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, rollback);
if (code) {
return code;
}
code = metaGenerateNewMeta(ppMeta);
if (code) {
metaError("vgId:%d failed to generate new meta, reason:%s", TD_VID(pVnode), tstrerror(code));
}
metaClose(ppMeta);
if (taosRenameFile(oldMetaPath, backupMetaPath) != 0) {
metaError("vgId:%d failed to rename old meta to backup, reason:%s", TD_VID(pVnode), tstrerror(terrno));
return terrno;
}
// rename the new meta to old meta
if (taosRenameFile(newMetaPath, oldMetaPath) != 0) {
metaError("vgId:%d failed to rename new meta to old meta, reason:%s", TD_VID(pVnode), tstrerror(terrno));
return terrno;
}
code = metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, false);
if (code) {
metaError("vgId:%d failed to open new meta, reason:%s", TD_VID(pVnode), tstrerror(code));
return code;
}
}
} else {
return metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, rollback);
}
return TSDB_CODE_SUCCESS;
}
int32_t metaUpgrade(SVnode *pVnode, SMeta **ppMeta) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino;

View File

@ -2985,9 +2985,6 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
}
}
end:
if (terrno != 0) {
ret = terrno;
}
tDecoderClear(&dc);
tdbFree(pData);
return ret;

View File

@ -694,7 +694,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if ((ppTask != NULL) && ((*ppTask) != NULL)) {
streamMetaAcquireOneTask(*ppTask);
int32_t unusedRetRef = streamMetaAcquireOneTask(*ppTask);
SStreamTask* pTask = *ppTask;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
@ -1121,10 +1121,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
int32_t vgId = pMeta->vgId;
int32_t code = 0;
if (pTask == NULL) {
return -1;
}
streamTaskResume(pTask);
ETaskStatus status = streamTaskGetStatus(pTask).state;
@ -1152,7 +1148,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
}
}
streamMetaReleaseTask(pMeta, pTask);
return code;
}
@ -1175,6 +1170,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
if (code != 0) {
streamMetaReleaseTask(pMeta, pTask);
return code;
}
@ -1188,6 +1184,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
streamMutexUnlock(&pHTask->lock);
code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
streamMetaReleaseTask(pMeta, pHTask);
}
return code;

View File

@ -602,14 +602,14 @@ int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fs
SSttLvl *lvl;
code = tsdbSttLvlInitRef(pTsdb, lvl1, &lvl);
if (code) {
taosMemoryFree(lvl);
tsdbSttLvlClear(&lvl);
tsdbTFileSetClear(fset);
return code;
}
code = TARRAY2_APPEND(fset[0]->lvlArr, lvl);
if (code) {
taosMemoryFree(lvl);
tsdbSttLvlClear(&lvl);
tsdbTFileSetClear(fset);
return code;
}

View File

@ -855,6 +855,7 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S
STableBlockScanInfo** p = taosArrayGetLast(pTableScanInfoList);
if (p == NULL) {
clearBrinBlockIter(&iter);
tsdbError("invalid param, empty in tablescanInfoList, %s", pReader->idStr);
return TSDB_CODE_INVALID_PARA;
}
@ -5256,7 +5257,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
// NOTE: the following codes is used to perform test for suspend/resume for tsdbReader when it blocks the commit
// the data should be ingested in round-robin and all the child tables should be createted before ingesting data
// the version range of query will be used to identify the correctness of suspend/resume functions.
// this function will blocked before loading the SECOND block from vnode-buffer, and restart itself from sst-files
// this function will be blocked before loading the SECOND block from vnode-buffer, and restart itself from sst-files
#if SUSPEND_RESUME_TEST
if (!pReader->status.suspendInvoked && !pReader->status.loadFromFile) {
tsem_wait(&pReader->resumeAfterSuspend);
@ -5909,6 +5910,7 @@ int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_
} else if (mr.me.type == TSDB_NORMAL_TABLE) { // do nothing
} else {
code = TSDB_CODE_INVALID_PARA;
tsdbError("invalid mr.me.type:%d, code:%s", mr.me.type, tstrerror(code));
metaReaderClear(&mr);
return code;
}

View File

@ -45,6 +45,7 @@ const SVnodeCfg vnodeCfgDefault = {.vgId = -1,
.retentionPeriod = -1,
.rollPeriod = 0,
.segSize = 0,
.committed = 0,
.retentionSize = -1,
.level = TAOS_WAL_WRITE,
.clearFiles = 0,

View File

@ -257,6 +257,7 @@ int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
code = vnodeDecodeInfo(pData, pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
pInfo->config.walCfg.committed = pInfo->state.committed;
_exit:
if (code) {
if (pFile) {

View File

@ -633,45 +633,44 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
}
break;
case TDMT_STREAM_TASK_DEPLOY: {
if ((code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len)) != 0) {
if ((code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len)) != TSDB_CODE_SUCCESS) {
goto _err;
}
} break;
case TDMT_STREAM_TASK_DROP: {
if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
if ((code = tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen)) < 0) {
goto _err;
}
} break;
case TDMT_STREAM_TASK_UPDATE_CHKPT: {
if (tqProcessTaskUpdateCheckpointReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
if ((code = tqProcessTaskUpdateCheckpointReq(pVnode->pTq, pMsg->pCont, pMsg->contLen)) < 0) {
goto _err;
}
} break;
case TDMT_STREAM_CONSEN_CHKPT: {
if (pVnode->restored) {
if (tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg) < 0) {
goto _err;
}
if (pVnode->restored && (code = tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg)) < 0) {
goto _err;
}
} break;
case TDMT_STREAM_TASK_PAUSE: {
if (pVnode->restored && vnodeIsLeader(pVnode) &&
tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
(code = tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen)) < 0) {
goto _err;
}
} break;
case TDMT_STREAM_TASK_RESUME: {
if (pVnode->restored && vnodeIsLeader(pVnode) &&
tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
(code = tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen)) < 0) {
goto _err;
}
} break;
case TDMT_VND_STREAM_TASK_RESET: {
if (pVnode->restored && vnodeIsLeader(pVnode)) {
if (tqProcessTaskResetReq(pVnode->pTq, pMsg) < 0) {
if (pVnode->restored && vnodeIsLeader(pVnode) &&
(code = tqProcessTaskResetReq(pVnode->pTq, pMsg)) < 0) {
goto _err;
}
}
} break;
case TDMT_VND_ALTER_CONFIRM:
needCommit = pVnode->config.hashChange;
@ -691,10 +690,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
case TDMT_VND_DROP_INDEX:
vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp);
break;
case TDMT_VND_STREAM_CHECK_POINT_SOURCE:
case TDMT_VND_STREAM_CHECK_POINT_SOURCE: // always return true
tqProcessTaskCheckPointSourceReq(pVnode->pTq, pMsg, pRsp);
break;
case TDMT_VND_STREAM_TASK_UPDATE:
case TDMT_VND_STREAM_TASK_UPDATE: // always return true
tqProcessTaskUpdateReq(pVnode->pTq, pMsg);
break;
case TDMT_VND_COMPACT:
@ -750,7 +749,7 @@ _exit:
_err:
vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
tstrerror(terrno), ver);
tstrerror(code), ver);
return code;
}

View File

@ -551,7 +551,7 @@ void appendTagFields(char* buf, int32_t* len, STableCfg* pCfg) {
(int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
*len += tsnprintf(buf + VARSTR_HEADER_SIZE + *len, sizeof(type) - (VARSTR_HEADER_SIZE + *len), "%s`%s` %s",
*len += tsnprintf(buf + VARSTR_HEADER_SIZE + *len, SHOW_CREATE_TB_RESULT_FIELD2_LEN - (VARSTR_HEADER_SIZE + *len), "%s`%s` %s",
((i > 0) ? ", " : ""), pSchema->name, type);
}
}

View File

@ -278,7 +278,7 @@ bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bo
}
static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
SSDataBlock* pSrcBlock, int32_t index, bool beforeTs, SExecTaskInfo* pTaskInfo, bool genAfterBlock) {
SSDataBlock* pSrcBlock, int32_t index, bool beforeTs, SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t rows = pResBlock->info.rows;
@ -427,7 +427,7 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
break;
}
if (start.key == INT64_MIN || end.key == INT64_MIN || genAfterBlock) {
if (start.key == INT64_MIN || end.key == INT64_MIN) {
colDataSetNULL(pDst, rows);
break;
}
@ -463,13 +463,8 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
break;
}
if (genAfterBlock && rows == 0) {
hasInterp = false;
break;
}
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, srcSlot);
if (pkey->isNull == false && !genAfterBlock) {
if (pkey->isNull == false) {
code = colDataSetVal(pDst, rows, pkey->pData, false);
QUERY_CHECK_CODE(code, lino, _end);
} else {
@ -841,7 +836,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false, pTaskInfo, false) &&
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false, pTaskInfo) &&
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
break;
} else {
@ -869,7 +864,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
doKeepLinearInfo(pSliceInfo, pBlock, i);
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true, pTaskInfo, false) &&
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true, pTaskInfo) &&
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
break;
} else {
@ -914,12 +909,13 @@ static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperato
SSDataBlock* pResBlock = pSliceInfo->pRes;
SInterval* pInterval = &pSliceInfo->interval;
if (pSliceInfo->pPrevGroupKey == NULL) {
if (pSliceInfo->fillType == TSDB_FILL_NEXT || pSliceInfo->fillType == TSDB_FILL_LINEAR ||
pSliceInfo->pPrevGroupKey == NULL) {
return;
}
while (pSliceInfo->current <= pSliceInfo->win.ekey) {
(void)genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false, pOperator->pTaskInfo, true);
(void)genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false, pOperator->pTaskInfo);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}

View File

@ -299,7 +299,7 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
return;
}
/*SStreamTask* p = */ streamMetaAcquireOneTask(pTask); // add task ref here
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); // add task ref here
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);

View File

@ -347,7 +347,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
if (old == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"trigger-recv-monitor");
pTmrInfo->launchChkptId = pActiveInfo->activeId;

View File

@ -1162,7 +1162,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
if (old == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");

View File

@ -753,12 +753,17 @@ int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t task
return code;
}
void streamMetaAcquireOneTask(SStreamTask* pTask) {
int32_t streamMetaAcquireOneTask(SStreamTask* pTask) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
stTrace("s-task:%s acquire task, ref:%d", pTask->id.idStr, ref);
return ref;
}
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
if (pTask == NULL) {
return;
}
int32_t taskId = pTask->id.taskId;
int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);
@ -862,7 +867,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) {
pTask = *ppTask;
// it is an fill-history task, remove the related stream task's id that points to it
// it is a fill-history task, remove the related stream task's id that points to it
if (pTask->info.fillHistory == 0) {
int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
}

View File

@ -58,7 +58,7 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) {
}
}
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
int32_t ref = streamMetaAcquireOneTask(pTask);
stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", id, ref, pTask->info.delaySchedParam);
streamTmrStart(streamTaskSchedHelper, delay, pTask, streamTimer, &pTask->schedInfo.pDelayTimer,
@ -97,7 +97,11 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3
pRunReq->reqType = execType;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
return tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
int32_t code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
if (code) {
stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId);
}
return code;
}
void streamTaskClearSchedIdleInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; }
@ -110,7 +114,7 @@ void streamTaskResumeInFuture(SStreamTask* pTask) {
pTask->status.schedIdleTime, ref);
// add one ref count for task
streamMetaAcquireOneTask(pTask);
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer,
pTask->pMeta->vgId, "resume-task-tmr");
}

View File

@ -260,10 +260,12 @@ void tFreeStreamTask(SStreamTask* pTask) {
if (pTask->inputq.queue) {
streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
pTask->inputq.queue = NULL;
}
if (pTask->outputq.queue) {
streamQueueClose(pTask->outputq.queue, pTask->id.taskId);
pTask->outputq.queue = NULL;
}
if (pTask->exec.qmsg) {
@ -277,6 +279,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
if (pTask->exec.pWalReader != NULL) {
walCloseReader(pTask->exec.pWalReader);
pTask->exec.pWalReader = NULL;
}
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);

View File

@ -501,9 +501,10 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
if (pTrans == NULL) {
ETaskStatus s = pSM->current.state;
if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP &&
s != TASK_STATUS__UNINIT && s != TASK_STATUS__READY) {
stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT &&
s != TASK_STATUS__READY) {
stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name,
GET_EVT_NAME(pSM->prev.evt));
}
// the pSM->prev.evt may be 0, so print string is not appropriate.
@ -521,11 +522,15 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}
keepPrevInfo(pSM);
// repeat pause will not overwrite the previous pause state
if (pSM->current.state != TASK_STATUS__PAUSE || pTrans->next.state != TASK_STATUS__PAUSE) {
keepPrevInfo(pSM);
pSM->current = pTrans->next;
} else {
stDebug("s-task:%s repeat pause evt recv, not update prev status", id);
}
pSM->current = pTrans->next;
pSM->pActiveTrans = NULL;
// todo remove it
// todo: handle the error code
// on success callback, add into lock if necessary, or maybe we should add an option for this?

View File

@ -56,7 +56,7 @@ void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void*
}
}
stDebug("vgId:%d start %s tmr succ", vgId, pMsg);
stTrace("vgId:%d start %s tmr succ", vgId, pMsg);
}
void streamTmrStop(tmr_h tmrId) {

View File

@ -24,7 +24,6 @@ extern "C" {
#define TIMER_MAX_MS 0x7FFFFFFF
#define PING_TIMER_MS 5000
#define HEARTBEAT_TICK_NUM 20
typedef struct SSyncEnv {
uint8_t isStart;

View File

@ -977,9 +977,10 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
pData->logicClock = pSyncTimer->logicClock;
pData->execTime = tsNow + pSyncTimer->timerMS;
sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, pData->rid, pData->destId.addr);
sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64 " at %d", pSyncNode->vgId, pData->rid,
pData->destId.addr, pSyncTimer->timerMS);
TAOS_CHECK_RETURN(taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
TAOS_CHECK_RETURN(taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid),
syncEnv()->pTimerManager, &pSyncTimer->pTimer));
} else {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
@ -2711,7 +2712,8 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
return;
}
sTrace("vgId:%d, eq peer hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, hbDataRid, pData->destId.addr);
sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, hbDataRid,
pData->destId.addr);
if (pSyncNode->totalReplicaNum > 1) {
int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
@ -2753,13 +2755,12 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
if (ret != 0) {
sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
}
} else {
}
if (syncIsInit()) {
// sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
if ((code = taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM,
(void*)hbDataRid, syncEnv()->pTimerManager, &pSyncTimer->pTimer)) != 0) {
sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
if ((code = taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
syncEnv()->pTimerManager, &pSyncTimer->pTimer)) != 0) {
sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
syncNodeRelease(pSyncNode);
syncHbTimerDataRelease(pData);

View File

@ -1446,6 +1446,9 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
return ret;
}
ofpCell = tdbPageGetCell(ofp, 0);
if (ofpCell == NULL) {
return TSDB_CODE_INVALID_DATA_FMT;
}
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeft;

View File

@ -282,6 +282,17 @@ static int32_t walRebuildFileInfoSet(SArray* metaLogList, SArray* actualLogList)
}
static void walAlignVersions(SWal* pWal) {
if (pWal->cfg.committed > 0 && pWal->cfg.committed != pWal->vers.snapshotVer) {
wWarn("vgId:%d, snapshotVer:%" PRId64 " in wal is different from commited:%" PRId64
". in vnode/mnode. align with it.",
pWal->cfg.vgId, pWal->vers.snapshotVer, pWal->cfg.committed);
pWal->vers.snapshotVer = pWal->cfg.committed;
}
if (pWal->vers.snapshotVer < 0 && pWal->vers.firstVer > 0) {
wWarn("vgId:%d, snapshotVer:%" PRId64 " in wal is an invalid value. align it with firstVer:%" PRId64 ".",
pWal->cfg.vgId, pWal->vers.snapshotVer, pWal->vers.firstVer);
pWal->vers.snapshotVer = pWal->vers.firstVer;
}
if (pWal->vers.firstVer > pWal->vers.snapshotVer + 1) {
wWarn("vgId:%d, firstVer:%" PRId64 " is larger than snapshotVer:%" PRId64 " + 1. align with it.", pWal->cfg.vgId,
pWal->vers.firstVer, pWal->vers.snapshotVer);
@ -400,6 +411,17 @@ static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
void printFileSet(SArray* fileSet) {
int32_t sz = taosArrayGetSize(fileSet);
for (int32_t i = 0; i < sz; i++) {
SWalFileInfo* pFileInfo = taosArrayGet(fileSet, i);
wInfo("firstVer:%" PRId64 ", lastVer:%" PRId64 ", fileSize:%" PRId64 ", syncedOffset:%" PRId64 ", createTs:%" PRId64
", closeTs:%" PRId64,
pFileInfo->firstVer, pFileInfo->lastVer, pFileInfo->fileSize, pFileInfo->syncedOffset, pFileInfo->createTs,
pFileInfo->closeTs);
}
}
int32_t walCheckAndRepairMeta(SWal* pWal) {
// load log files, get first/snapshot/last version info
int32_t code = 0;
@ -460,6 +482,10 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
taosArraySort(actualLog, compareWalFileInfo);
wInfo("vgId:%d, wal path:%s, actual log file num:%d", pWal->cfg.vgId, pWal->path,
(int32_t)taosArrayGetSize(actualLog));
printFileSet(actualLog);
int metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
int actualFileNum = taosArrayGetSize(actualLog);
int64_t firstVerPrev = pWal->vers.firstVer;
@ -474,6 +500,10 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
TAOS_RETURN(code);
}
wInfo("vgId:%d, wal path:%s, meta log file num:%d", pWal->cfg.vgId, pWal->path,
(int32_t)taosArrayGetSize(pWal->fileInfoSet));
printFileSet(pWal->fileInfoSet);
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
// scan and determine the lastVer
@ -533,6 +563,7 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
// repair ts of files
TAOS_CHECK_RETURN(walRepairLogFileTs(pWal, &updateMeta));
printFileSet(pWal->fileInfoSet);
// update meta file
if (updateMeta) {
TAOS_CHECK_RETURN(walSaveMeta(pWal));
@ -1124,6 +1155,10 @@ int32_t walLoadMeta(SWal* pWal) {
(void)taosCloseFile(&pFile);
taosMemoryFree(buf);
wInfo("vgId:%d, load meta file: %s, fileInfoSet size:%d", pWal->cfg.vgId, fnameStr,
(int32_t)taosArrayGetSize(pWal->fileInfoSet));
printFileSet(pWal->fileInfoSet);
TAOS_RETURN(code);
}

View File

@ -91,7 +91,8 @@ static int32_t walInitLock(SWal *pWal) {
}
SWal *walOpen(const char *path, SWalCfg *pCfg) {
SWal *pWal = taosMemoryCalloc(1, sizeof(SWal));
int32_t code = 0;
SWal *pWal = taosMemoryCalloc(1, sizeof(SWal));
if (pWal == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
@ -160,17 +161,20 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->writeHead.magic = WAL_MAGIC;
// load meta
if (walLoadMeta(pWal) < 0) {
wInfo("vgId:%d, failed to load meta since %s", pWal->cfg.vgId, tstrerror(terrno));
code = walLoadMeta(pWal);
if (code < 0) {
wWarn("vgId:%d, failed to load meta since %s", pWal->cfg.vgId, tstrerror(code));
}
if (walCheckAndRepairMeta(pWal) < 0) {
wError("vgId:%d, cannot open wal since repair meta file failed", pWal->cfg.vgId);
code = walCheckAndRepairMeta(pWal);
if (code < 0) {
wError("vgId:%d, cannot open wal since repair meta file failed since %s", pWal->cfg.vgId, tstrerror(code));
goto _err;
}
if (walCheckAndRepairIdx(pWal) < 0) {
wError("vgId:%d, cannot open wal since repair idx file failed", pWal->cfg.vgId);
code = walCheckAndRepairIdx(pWal);
if (code < 0) {
wError("vgId:%d, cannot open wal since repair idx file failed since %s", pWal->cfg.vgId, tstrerror(code));
goto _err;
}

View File

@ -127,6 +127,7 @@ class WalRetentionEnv : public ::testing::Test {
SWalCfg cfg;
cfg.rollPeriod = -1;
cfg.segSize = -1;
cfg.committed =-1;
cfg.retentionPeriod = -1;
cfg.retentionSize = 0;
cfg.rollPeriod = 0;

View File

@ -200,6 +200,7 @@ void* taosArrayPop(SArray* pArray) {
void* taosArrayGet(const SArray* pArray, size_t index) {
if (NULL == pArray) {
terrno = TSDB_CODE_INVALID_PARA;
uError("failed to return value from array of null ptr");
return NULL;
}

View File

@ -13,6 +13,7 @@ from frame.srvCtl import *
from frame.caseBase import *
from frame import *
from frame.autogen import *
from frame import epath
# from frame.server.dnodes import *
# from frame.server.cluster import *
@ -20,7 +21,9 @@ from frame.autogen import *
class TDTestCase(TBase):
def init(self, conn, logSql, replicaVar=1):
updatecfgDict = {'dDebugFlag':131}
super(TDTestCase, self).init(conn, logSql, replicaVar=1, checkColName="c1")
self.valgrind = 0
self.db = "test"
self.stb = "meters"
@ -50,9 +53,36 @@ class TDTestCase(TBase):
tdSql.error("create encrypt_key '12345678abcdefghi'")
tdSql.error("create database test ENCRYPT_ALGORITHM 'sm4'")
def recreate_dndoe_encrypt_key(self):
"""
Description: From the jira TS-5507, the encrypt key can be recreated.
create:
2024-09-23 created by Charles
update:
None
"""
# taosd path
taosd_path = epath.binPath()
tdLog.info(f"taosd_path: {taosd_path}")
# dnode2 path
dndoe2_path = tdDnodes.getDnodeDir(2)
dnode2_data_path = os.sep.join([dndoe2_path, "data"])
dnode2_cfg_path = os.sep.join([dndoe2_path, "cfg"])
tdLog.info(f"dnode2_path: {dnode2_data_path}")
# stop dnode2
tdDnodes.stoptaosd(2)
tdLog.info("stop dndoe2")
# delete dndoe2 data
cmd = f"rm -rf {dnode2_data_path}"
os.system(cmd)
# recreate the encrypt key for dnode2
os.system(f"{os.sep.join([taosd_path, 'taosd'])} -y '1234567890' -c {dnode2_cfg_path}")
tdLog.info("test case: recreate the encrypt key for dnode2 passed")
def run(self):
self.create_encrypt_db_error()
self.create_encrypt_db()
self.recreate_dndoe_encrypt_key()
def stop(self):
tdSql.close()

View File

@ -10,7 +10,7 @@
# army-test
#
,,y,army,./pytest.sh python3 ./test.py -f multi-level/mlevel_basic.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f db-encrypt/basic.py
,,y,army,./pytest.sh python3 ./test.py -f db-encrypt/basic.py -N 3 -M 3
,,n,army,python3 ./test.py -f storage/s3/s3Basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f cluster/snapshot.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f query/function/test_func_elapsed.py

View File

@ -907,7 +907,7 @@ class TDTestCase:
## {. . .}
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(next)")
tdSql.checkRows(13)
tdSql.checkRows(12)
tdSql.checkData(0, 0, 5)
tdSql.checkData(1, 0, 5)
tdSql.checkData(2, 0, 10)
@ -920,7 +920,6 @@ class TDTestCase:
tdSql.checkData(9, 0, 15)
tdSql.checkData(10, 0, 15)
tdSql.checkData(11, 0, 15)
tdSql.checkData(12, 0, None)
## {} ...
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:01', '2020-02-01 00:00:04') every(1s) fill(next)")
@ -958,12 +957,10 @@ class TDTestCase:
## ..{.}
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:13', '2020-02-01 00:00:17') every(1s) fill(next)")
tdSql.checkRows(5)
tdSql.checkRows(3)
tdSql.checkData(0, 0, 15)
tdSql.checkData(1, 0, 15)
tdSql.checkData(2, 0, 15)
tdSql.checkData(3, 0, None)
tdSql.checkData(4, 0, None)
## ... {}
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(next)")
@ -1275,7 +1272,7 @@ class TDTestCase:
tdSql.checkData(8, 1, True)
tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(next)")
tdSql.checkRows(13)
tdSql.checkRows(12)
tdSql.checkCols(3)
tdSql.checkData(0, 0, '2020-02-01 00:00:04.000')
@ -1290,7 +1287,6 @@ class TDTestCase:
tdSql.checkData(9, 0, '2020-02-01 00:00:13.000')
tdSql.checkData(10, 0, '2020-02-01 00:00:14.000')
tdSql.checkData(11, 0, '2020-02-01 00:00:15.000')
tdSql.checkData(12, 0, '2020-02-01 00:00:16.000')
tdSql.checkData(0, 1, True)
tdSql.checkData(1, 1, False)
@ -1304,7 +1300,6 @@ class TDTestCase:
tdSql.checkData(9, 1, True)
tdSql.checkData(10, 1, True)
tdSql.checkData(11, 1, False)
tdSql.checkData(12, 1, True)
tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-01 00:00:15') every(2s) fill(next)")
tdSql.checkRows(6)
@ -1682,13 +1677,9 @@ class TDTestCase:
## | . | { | .} |
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(next)")
tdSql.checkRows(6)
tdSql.checkRows(2)
tdSql.checkData(0, 0, 15)
tdSql.checkData(1, 0, 15)
tdSql.checkData(2, 0, None)
tdSql.checkData(3, 0, None)
tdSql.checkData(4, 0, None)
tdSql.checkData(5, 0, None)
# test fill linear
@ -2741,7 +2732,7 @@ class TDTestCase:
tdSql.checkData(4, i, 15)
tdSql.query(f"select interp(c0),interp(c1),interp(c2),interp(c3) from {dbname}.{tbname} range('2020-02-09 00:00:05', '2020-02-13 00:00:05') every(1d) fill(next)")
tdSql.checkRows(5)
tdSql.checkRows(3)
tdSql.checkCols(4)
for i in range (tdSql.queryCols):
@ -2837,7 +2828,7 @@ class TDTestCase:
# test fill next
tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(next)")
tdSql.checkRows(19)
tdSql.checkRows(18)
tdSql.checkCols(3)
tdSql.checkData(0, 0, '2020-02-02 00:00:00.000')
@ -2860,7 +2851,6 @@ class TDTestCase:
tdSql.checkData(15, 2, None)
tdSql.checkData(16, 2, None)
tdSql.checkData(17, 2, None)
tdSql.checkData(18, 2, None)
tdSql.checkData(17, 0, '2020-02-02 00:00:17.000')
@ -3091,7 +3081,7 @@ class TDTestCase:
# test fill linear
tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(18)
tdSql.checkRows(17)
tdSql.checkCols(3)
tdSql.checkData(0, 0, '2020-02-02 00:00:01.000')
@ -3113,9 +3103,8 @@ class TDTestCase:
tdSql.checkData(14, 2, None)
tdSql.checkData(15, 2, None)
tdSql.checkData(16, 2, None)
tdSql.checkData(17, 2, None)
tdSql.checkData(17, 0, '2020-02-02 00:00:18.000')
tdSql.checkData(16, 0, '2020-02-02 00:00:17.000')
tdLog.printNoPrefix("==========step13:test error cases")
@ -3231,7 +3220,7 @@ class TDTestCase:
tdSql.checkData(17, 1, True)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
tdSql.checkRows(19)
tdSql.checkRows(18)
tdSql.checkData(0, 0, '2020-02-01 00:00:00.000')
tdSql.checkData(0, 1, True)
@ -3254,12 +3243,9 @@ class TDTestCase:
tdSql.checkData(15, 2, 15)
tdSql.checkData(16, 2, 17)
tdSql.checkData(17, 2, 17)
tdSql.checkData(18, 2, None)
tdSql.checkData(17, 0, '2020-02-01 00:00:17.000')
tdSql.checkData(17, 1, False)
tdSql.checkData(18, 0, '2020-02-01 00:00:18.000')
tdSql.checkData(18, 1, True)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(17)
@ -3376,24 +3362,24 @@ class TDTestCase:
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
tdSql.checkRows(57)
for i in range(0, 19):
tdSql.checkRows(48)
for i in range(0, 14):
tdSql.checkData(i, 0, 'ctb1')
for i in range(19, 38):
for i in range(14, 30):
tdSql.checkData(i, 0, 'ctb2')
for i in range(38, 57):
for i in range(30, 48):
tdSql.checkData(i, 0, 'ctb3')
tdSql.checkData(0, 1, '2020-02-01 00:00:00.000')
tdSql.checkData(18, 1, '2020-02-01 00:00:18.000')
tdSql.checkData(13, 1, '2020-02-01 00:00:13.000')
tdSql.checkData(19, 1, '2020-02-01 00:00:00.000')
tdSql.checkData(37, 1, '2020-02-01 00:00:18.000')
tdSql.checkData(14, 1, '2020-02-01 00:00:00.000')
tdSql.checkData(29, 1, '2020-02-01 00:00:15.000')
tdSql.checkData(38, 1, '2020-02-01 00:00:00.000')
tdSql.checkData(56, 1, '2020-02-01 00:00:18.000')
tdSql.checkData(30, 1, '2020-02-01 00:00:00.000')
tdSql.checkData(47, 1, '2020-02-01 00:00:17.000')
for i in range(0, 2):
tdSql.checkData(i, 3, 1)
@ -3404,33 +3390,24 @@ class TDTestCase:
for i in range(8, 14):
tdSql.checkData(i, 3, 13)
for i in range(14, 19):
tdSql.checkData(i, 3, None)
for i in range(19, 23):
for i in range(14, 18):
tdSql.checkData(i, 3, 3)
for i in range(23, 29):
for i in range(18, 24):
tdSql.checkData(i, 3, 9)
for i in range(29, 35):
for i in range(24, 30):
tdSql.checkData(i, 3, 15)
for i in range(35, 38):
tdSql.checkData(i, 3, None)
for i in range(38, 44):
for i in range(30, 36):
tdSql.checkData(i, 3, 5)
for i in range(44, 50):
for i in range(36, 42):
tdSql.checkData(i, 3, 11)
for i in range(50, 56):
for i in range(42, 48):
tdSql.checkData(i, 3, 17)
for i in range(56, 57):
tdSql.checkData(i, 3, None)
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(39)
@ -3473,7 +3450,7 @@ class TDTestCase:
tdSql.checkRows(90)
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
tdSql.checkRows(171)
tdSql.checkRows(90)
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(9)
@ -3490,7 +3467,7 @@ class TDTestCase:
tdSql.checkRows(48)
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
tdSql.checkRows(57)
tdSql.checkRows(48)
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(39)
@ -4386,7 +4363,7 @@ class TDTestCase:
tdSql.query(f"select _irowts, _isfilled, interp(c0, 1) from {dbname}.{tbname_null} range('2020-02-02 00:00:01', '2020-02-02 00:00:11') every(1s) fill(next)")
tdSql.checkRows(11)
tdSql.checkRows(9)
tdSql.checkData(0, 1, False)
tdSql.checkData(1, 1, True)
tdSql.checkData(2, 1, False)
@ -4396,8 +4373,6 @@ class TDTestCase:
tdSql.checkData(6, 1, True)
tdSql.checkData(7, 1, False)
tdSql.checkData(8, 1, False)
tdSql.checkData(9, 1, True)
tdSql.checkData(10, 1, True)
tdSql.checkData(0, 2, 1)
tdSql.checkData(1, 2, 3)
@ -4408,13 +4383,11 @@ class TDTestCase:
tdSql.checkData(6, 2, 8)
tdSql.checkData(7, 2, 8)
tdSql.checkData(8, 2, 9)
tdSql.checkData(9, 2, None)
tdSql.checkData(10, 2, None)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{tbname_null} where c0 is not null range('2020-02-02 00:00:01', '2020-02-02 00:00:11') every(1s) fill(next)")
tdSql.checkRows(11)
tdSql.checkRows(9)
tdSql.checkData(0, 1, False)
tdSql.checkData(1, 1, True)
tdSql.checkData(2, 1, False)
@ -4424,9 +4397,6 @@ class TDTestCase:
tdSql.checkData(6, 1, True)
tdSql.checkData(7, 1, False)
tdSql.checkData(8, 1, False)
tdSql.checkData(9, 1, True)
tdSql.checkData(10, 1, True)
tdSql.checkData(0, 2, 1)
tdSql.checkData(1, 2, 3)
@ -4437,8 +4407,6 @@ class TDTestCase:
tdSql.checkData(6, 2, 8)
tdSql.checkData(7, 2, 8)
tdSql.checkData(8, 2, 9)
tdSql.checkData(9, 2, None)
tdSql.checkData(10, 2, None)
# super table
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname_null} range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
@ -4475,7 +4443,7 @@ class TDTestCase:
tdSql.query(f"select _irowts, _isfilled, interp(c0, 1) from {dbname}.{stbname_null} range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
tdSql.checkRows(9)
tdSql.checkRows(8)
tdSql.checkData(0, 1, False)
tdSql.checkData(1, 1, True)
tdSql.checkData(2, 1, True)
@ -4484,7 +4452,6 @@ class TDTestCase:
tdSql.checkData(5, 1, True)
tdSql.checkData(6, 1, False)
tdSql.checkData(7, 1, False)
tdSql.checkData(8, 1, True)
tdSql.checkData(0, 2, 1)
tdSql.checkData(1, 2, 9)
@ -4494,12 +4461,11 @@ class TDTestCase:
tdSql.checkData(5, 2, 13)
tdSql.checkData(6, 2, 13)
tdSql.checkData(7, 2, 15)
tdSql.checkData(8, 2, None)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname_null} where c0 is not null range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
tdSql.checkRows(9)
tdSql.checkRows(8)
tdSql.checkData(0, 1, False)
tdSql.checkData(1, 1, True)
tdSql.checkData(2, 1, True)
@ -4508,7 +4474,6 @@ class TDTestCase:
tdSql.checkData(5, 1, True)
tdSql.checkData(6, 1, False)
tdSql.checkData(7, 1, False)
tdSql.checkData(8, 1, True)
tdSql.checkData(0, 2, 1)
tdSql.checkData(1, 2, 9)
@ -4518,37 +4483,36 @@ class TDTestCase:
tdSql.checkData(5, 2, 13)
tdSql.checkData(6, 2, 13)
tdSql.checkData(7, 2, 15)
tdSql.checkData(8, 2, None)
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0, 1) from {dbname}.{stbname_null} partition by tbname range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
tdSql.checkRows(18)
for i in range(0, 9):
tdSql.checkRows(15)
for i in range(0, 7):
tdSql.checkData(i, 0, 'ctb1_null')
for i in range(9, 18):
for i in range(7, 15):
tdSql.checkData(i, 0, 'ctb2_null')
tdSql.checkData(0, 1, '2020-02-01 00:00:01.000')
tdSql.checkData(8, 1, '2020-02-01 00:00:17.000')
tdSql.checkData(6, 1, '2020-02-01 00:00:13.000')
tdSql.checkData(9, 1, '2020-02-01 00:00:01.000')
tdSql.checkData(17, 1, '2020-02-01 00:00:17.000')
tdSql.checkData(7, 1, '2020-02-01 00:00:01.000')
tdSql.checkData(14, 1, '2020-02-01 00:00:15.000')
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname_null} where c0 is not null partition by tbname range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
tdSql.checkRows(18)
for i in range(0, 9):
tdSql.checkRows(15)
for i in range(0, 7):
tdSql.checkData(i, 0, 'ctb1_null')
for i in range(9, 18):
for i in range(7, 15):
tdSql.checkData(i, 0, 'ctb2_null')
tdSql.checkData(0, 1, '2020-02-01 00:00:01.000')
tdSql.checkData(8, 1, '2020-02-01 00:00:17.000')
tdSql.checkData(6, 1, '2020-02-01 00:00:13.000')
tdSql.checkData(9, 1, '2020-02-01 00:00:01.000')
tdSql.checkData(17, 1, '2020-02-01 00:00:17.000')
tdSql.checkData(7, 1, '2020-02-01 00:00:01.000')
tdSql.checkData(14, 1, '2020-02-01 00:00:15.000')
# fill linear
# normal table

View File

@ -0,0 +1,66 @@
import random
import string
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
import numpy as np
class TDTestCase:
updatecfgDict = {'slowLogThresholdTest': ''}
updatecfgDict["slowLogThresholdTest"] = 0
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
def getPath(self, tool="taosBenchmark"):
if (platform.system().lower() == 'windows'):
tool = tool + ".exe"
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
paths = []
for root, dirs, files in os.walk(projPath):
if ((tool) in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
paths.append(os.path.join(root, tool))
break
if (len(paths) == 0):
tdLog.exit("taosBenchmark not found!")
return
else:
tdLog.info("taosBenchmark found in %s" % paths[0])
return paths[0]
def taosBenchmark(self, param):
binPath = self.getPath()
cmd = f"{binPath} {param}"
tdLog.info(cmd)
os.system(cmd)
def testSlowQuery(self):
self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y")
sql = "select count(*) from db.meters"
for i in range(10):
tdSql.query(sql)
tdSql.checkData(0, 0, 2 * 1000000)
def run(self):
self.testSlowQuery()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -925,3 +925,4 @@ python3 ./test.py -f 99-TDcase/TD-20582.py
python3 ./test.py -f 5-taos-tools/taosbenchmark/insertMix.py -N 3
python3 ./test.py -f 5-taos-tools/taosbenchmark/stt.py -N 3
python3 ./test.py -f eco-system/meta/database/keep_time_offset.py
python3 ./test.py -f 2-query/slow_query_basic.py

View File

@ -16,14 +16,14 @@
{
"dbinfo": {
"name": "dbrate",
"drop": "yes",
"vgroups": 2
"vgroups": 1,
"drop": "yes"
},
"super_tables": [
{
"name": "meters",
"child_table_exists": "no",
"childtable_count": 10,
"childtable_count": 1,
"childtable_prefix": "d",
"insert_mode": "@STMT_MODE",
"interlace_rows": @INTERLACE_MODE,

View File

@ -198,16 +198,20 @@ def findContextValue(context, label):
def writeTemplateInfo(resultFile):
# create info
context = readFileContext(templateFile)
context = readFileContext(templateFile)
vgroups = findContextValue(context, "vgroups")
childCount = findContextValue(context, "childtable_count")
insertRows = findContextValue(context, "insert_rows")
line = f"vgroups = {vgroups}\nchildtable_count = {childCount}\ninsert_rows = {insertRows}\n\n"
bindVGroup = findContextValue(context, "thread_bind_vgroup")
nThread = findContextValue(context, "thread_count")
if bindVGroup.lower().find("yes") != -1:
nThread = vgroups
line = f"thread_bind_vgroup = {bindVGroup}\nvgroups = {vgroups}\nchildtable_count = {childCount}\ninsert_rows = {insertRows}\ninsertThreads = {nThread} \n\n"
print(line)
appendFileContext(resultFile, line)
def totalCompressRate(stmt, interlace, resultFile, writeSpeed, querySpeed):
def totalCompressRate(stmt, interlace, resultFile, spent, spentReal, writeSpeed, writeReal, min, avg, p90, p99, max, querySpeed):
global Number
# flush
command = 'taos -s "flush database dbrate;"'
@ -220,7 +224,7 @@ def totalCompressRate(stmt, interlace, resultFile, writeSpeed, querySpeed):
# read compress rate
command = 'taos -s "show table distributed dbrate.meters\G;"'
rets = runRetList(command)
print(rets)
#print(rets)
str1 = rets[5]
arr = str1.split(" ")
@ -234,46 +238,88 @@ def totalCompressRate(stmt, interlace, resultFile, writeSpeed, querySpeed):
str2 = arr[6]
pos = str2.find("=[")
rate = str2[pos+2:]
print("rate =" + rate)
# total data file size
#dataSize = getFolderSize(f"{dataDir}/vnode/")
#dataSizeMB = int(dataSize/1024/1024)
# appand to file
# %("No", "stmtMode", "interlaceRows", "spent", "spent-real", "writeSpeed", "write-real", "query-QPS", "dataSize", "rate")
Number += 1
context = "%10s %10s %15s %10s %10s %30s %15s\n"%( Number, stmt, interlace, str(totalSize)+" MB", rate+"%", writeSpeed + " Records/second", querySpeed)
'''
context = "%2s %6s %10s %10s %10s %15s %15s %16s %16s %16s %16s %16s %8s %8s %8s\n"%(
Number, stmt, interlace, spent + "s", spentReal + "s", writeSpeed + " rows/s", writeReal + " rows/s",
min, avg, p90, p99, max,
querySpeed, str(totalSize) + " MB", rate + "%")
'''
context = "%2s %8s %10s %10s %16s %16s %12s %12s %12s %12s %12s %12s %10s %10s %10s\n"%(
Number, stmt, interlace, spent + "s", spentReal + "s", writeSpeed + "r/s", writeReal + "r/s",
min, avg, p90, p99, max + "ms",
querySpeed, str(totalSize) + " MB", rate + "%")
showLog(context)
appendFileContext(resultFile, context)
def cutEnd(line, start, endChar):
pos = line.find(endChar, start)
if pos == -1:
return line[start:]
return line[start : pos]
def findValue(context, pos, key, endChar,command):
pos = context.find(key, pos)
if pos == -1:
print(f"error, run command={command} output not found \"{key}\" keyword. context={context}")
exit(1)
pos += len(key)
value = cutEnd(context, pos, endChar)
return (value, pos)
def testWrite(jsonFile):
command = f"taosBenchmark -f {jsonFile}"
output, context = run(command, 60000)
print(context)
# SUCC: Spent 0.960248 (real 0.947154) seconds to insert rows: 100000 with 1 thread(s) into dbrate 104139.76 (real 105579.45) records/second
# find second real
pos = context.find("(real ")
# spent
key = "Spent "
pos = -1
pos1 = 0
while pos1 != -1: # find last "Spent "
pos1 = context.find(key, pos1)
if pos1 != -1:
pos = pos1 # update last found
pos1 += len(key)
if pos == -1:
print(f"error, run command={command} output not found first \"(real\" keyword. error={context}")
print(f"error, run command={command} output not found \"{key}\" keyword. context={context}")
exit(1)
pos = context.find("(real ", pos + 5)
pos += len(key)
spent = cutEnd(context, pos, ".")
# spent-real
spentReal, pos = findValue(context, pos, "(real ", ".", command)
# writeSpeed
key = "into "
pos = context.find(key, pos)
if pos == -1:
print(f"error, run command={command} output not found second \"(real\" keyword. error={context}")
exit(1)
pos += 5
length = len(context)
while pos < length and context[pos] == ' ':
pos += 1
end = context.find(".", pos)
if end == -1:
print(f"error, run command={command} output not found second \".\" keyword. error={context}")
print(f"error, run command={command} output not found \"{key}\" keyword. context={context}")
exit(1)
pos += len(key)
writeSpeed, pos = findValue(context, pos, " ", ".", command)
# writeReal
writeReal, pos = findValue(context, pos, "(real ", ".", command)
speed = context[pos: end]
#print(f"write pos ={pos} end={end} speed={speed}\n output={context} \n")
return speed
# delay
min, pos = findValue(context, pos, "min: ", ",", command)
avg, pos = findValue(context, pos, "avg: ", ",", command)
p90, pos = findValue(context, pos, "p90: ", ",", command)
p99, pos = findValue(context, pos, "p99: ", ",", command)
max, pos = findValue(context, pos, "max: ", "ms", command)
return (spent, spentReal, writeSpeed, writeReal, min, avg, p90, p99, max)
def testQuery():
command = f"taosBenchmark -f json/query.json"
@ -308,13 +354,13 @@ def doTest(stmt, interlace, resultFile):
# run taosBenchmark
t1 = time.time()
writeSpeed = testWrite(jsonFile)
spent, spentReal, writeSpeed, writeReal, min, avg, p90, p99, max = testWrite(jsonFile)
t2 = time.time()
# total write speed
querySpeed = testQuery()
# total compress rate
totalCompressRate(stmt, interlace, resultFile, writeSpeed, querySpeed)
totalCompressRate(stmt, interlace, resultFile, spent, spentReal, writeSpeed, writeReal, min, avg, p90, p99, max, querySpeed)
def main():
@ -333,7 +379,17 @@ def main():
# json info
writeTemplateInfo(resultFile)
# head
context = "\n%10s %10s %15s %10s %10s %30s %15s\n"%("No", "stmtMode", "interlaceRows", "dataSize", "rate", "writeSpeed", "query-QPS")
'''
context = "%3s %8s %10s %10s %10s %15s %15s %10s %10s %10s %10s %10s %8s %8s %8s\n"%(
"No", "stmtMode", "interlace", "spent", "spent-real", "writeSpeed", "write-real",
"min", "avg", "p90", "p99", "max",
"query-QPS", "dataSize", "rate")
'''
context = "%2s %8s %10s %10s %16s %16s %12s %12s %12s %12s %12s %12s %10s %10s %10s\n"%(
"No", "stmtMode", "interlace", "spent", "spent-real", "writeSpeed", "write-real",
"min", "avg", "p90", "p99", "max",
"query-QPS", "dataSize", "rate")
appendFileContext(resultFile, context)

View File

@ -134,8 +134,6 @@ def getMatch(datatype, algo):
def generateJsonFile(algo):
print(f"doTest algo: {algo} \n")
# replace datatype
context = readFileContext(templateFile)
# replace compress
@ -192,8 +190,6 @@ def findContextValue(context, label):
ends = [',','}',']', 0]
while context[end] not in ends:
end += 1
print(f"start = {start} end={end}\n")
return context[start:end]
@ -281,10 +277,10 @@ def testQuery():
# INFO: Spend 6.7350 second completed total queries: 10, the QPS of all threads: 1.485
speed = None
for i in range(20, len(lines)):
for i in range(0, len(lines)):
# find second real
context = lines[i]
pos = context.find("the QPS of all threads:")
context = lines[26]
if pos == -1 :
continue
pos += 24
@ -302,7 +298,6 @@ def doTest(algo, resultFile):
print(f"doTest algo: {algo} \n")
#cleanAndStartTaosd()
# json
jsonFile = generateJsonFile(algo)