Merge remote-tracking branch 'origin/3.0' into fix/tsim
This commit is contained in:
commit
f249bae82d
|
@ -249,13 +249,6 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|
||||||
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \
|
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \
|
||||||
(_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_APP_NOT_READY)
|
(_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_APP_NOT_READY)
|
||||||
|
|
||||||
#define NEED_SCHEDULER_RETRY_ERROR(_code) \
|
|
||||||
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
|
|
||||||
(_code) == TSDB_CODE_SCH_TIMEOUT_ERROR || (_code) == TSDB_CODE_RPC_BROKEN_LINK)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#define REQUEST_TOTAL_EXEC_TIMES 2
|
#define REQUEST_TOTAL_EXEC_TIMES 2
|
||||||
|
|
||||||
#define qFatal(...) \
|
#define qFatal(...) \
|
||||||
|
|
|
@ -623,6 +623,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150)
|
#define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150)
|
||||||
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
|
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
|
||||||
#define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152)
|
#define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152)
|
||||||
|
#define TSDB_CODE_RSMA_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x3153)
|
||||||
|
|
||||||
//index
|
//index
|
||||||
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
|
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
|
||||||
|
|
|
@ -786,6 +786,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
|
||||||
void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
|
void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
|
||||||
SRequestObj* pRequest = (SRequestObj*)param;
|
SRequestObj* pRequest = (SRequestObj*)param;
|
||||||
pRequest->code = code;
|
pRequest->code = code;
|
||||||
|
pRequest->body.resInfo.execRes = pResult->res;
|
||||||
|
|
||||||
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
|
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
|
||||||
TDMT_VND_CREATE_TABLE == pRequest->type) {
|
TDMT_VND_CREATE_TABLE == pRequest->type) {
|
||||||
|
@ -797,6 +798,8 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pResult);
|
||||||
|
|
||||||
tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
|
tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
|
||||||
tstrerror(code), pRequest->requestId);
|
tstrerror(code), pRequest->requestId);
|
||||||
|
|
||||||
|
|
|
@ -556,7 +556,7 @@ typedef struct {
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
int8_t status;
|
int8_t status;
|
||||||
// config
|
// config
|
||||||
int8_t dropPolicy;
|
int8_t igExpired;
|
||||||
int8_t trigger;
|
int8_t trigger;
|
||||||
int64_t triggerParam;
|
int64_t triggerParam;
|
||||||
int64_t watermark;
|
int64_t watermark;
|
||||||
|
|
|
@ -28,7 +28,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
||||||
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
|
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
|
||||||
|
|
||||||
if (tEncodeI8(pEncoder, pObj->dropPolicy) < 0) return -1;
|
if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
|
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
|
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
|
||||||
|
@ -73,7 +73,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
||||||
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
|
||||||
|
|
||||||
if (tDecodeI8(pDecoder, &pObj->dropPolicy) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
|
||||||
|
|
|
@ -248,7 +248,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
||||||
pObj->status = 0;
|
pObj->status = 0;
|
||||||
|
|
||||||
// TODO
|
// TODO
|
||||||
pObj->dropPolicy = 0;
|
pObj->igExpired = pCreate->igExpired;
|
||||||
pObj->trigger = pCreate->triggerType;
|
pObj->trigger = pCreate->triggerType;
|
||||||
pObj->triggerParam = pCreate->maxDelay;
|
pObj->triggerParam = pCreate->maxDelay;
|
||||||
pObj->watermark = pCreate->watermark;
|
pObj->watermark = pCreate->watermark;
|
||||||
|
@ -301,6 +301,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
||||||
.streamQuery = true,
|
.streamQuery = true,
|
||||||
.triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger,
|
.triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger,
|
||||||
.watermark = pObj->watermark,
|
.watermark = pObj->watermark,
|
||||||
|
.igExpired = pObj->igExpired,
|
||||||
};
|
};
|
||||||
|
|
||||||
// using ast and param to build physical plan
|
// using ast and param to build physical plan
|
||||||
|
|
|
@ -31,6 +31,8 @@ target_sources(
|
||||||
"src/sma/smaEnv.c"
|
"src/sma/smaEnv.c"
|
||||||
"src/sma/smaUtil.c"
|
"src/sma/smaUtil.c"
|
||||||
"src/sma/smaOpen.c"
|
"src/sma/smaOpen.c"
|
||||||
|
"src/sma/smaCommit.c"
|
||||||
|
"src/sma/smaSnapshot.c"
|
||||||
"src/sma/smaRollup.c"
|
"src/sma/smaRollup.c"
|
||||||
"src/sma/smaTimeRange.c"
|
"src/sma/smaTimeRange.c"
|
||||||
|
|
||||||
|
|
|
@ -92,8 +92,9 @@ enum {
|
||||||
TASK_TRIGGER_STAT_INIT = 0,
|
TASK_TRIGGER_STAT_INIT = 0,
|
||||||
TASK_TRIGGER_STAT_ACTIVE = 1,
|
TASK_TRIGGER_STAT_ACTIVE = 1,
|
||||||
TASK_TRIGGER_STAT_INACTIVE = 2,
|
TASK_TRIGGER_STAT_INACTIVE = 2,
|
||||||
TASK_TRIGGER_STAT_CANCELLED = 3,
|
TASK_TRIGGER_STAT_PAUSED = 3,
|
||||||
TASK_TRIGGER_STAT_FINISHED = 4,
|
TASK_TRIGGER_STAT_CANCELLED = 4,
|
||||||
|
TASK_TRIGGER_STAT_FINISHED = 5,
|
||||||
};
|
};
|
||||||
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
|
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
|
||||||
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
|
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
|
||||||
|
@ -214,25 +215,22 @@ struct STFInfo {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct STFile {
|
struct STFile {
|
||||||
STFInfo info;
|
|
||||||
STfsFile f;
|
|
||||||
TdFilePtr pFile;
|
|
||||||
uint8_t state;
|
uint8_t state;
|
||||||
|
STFInfo info;
|
||||||
|
char *fname;
|
||||||
|
TdFilePtr pFile;
|
||||||
};
|
};
|
||||||
|
|
||||||
#define TD_TFILE_F(tf) (&((tf)->f))
|
|
||||||
#define TD_TFILE_PFILE(tf) ((tf)->pFile)
|
#define TD_TFILE_PFILE(tf) ((tf)->pFile)
|
||||||
#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
|
#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
|
||||||
#define TD_TFILE_FULL_NAME(tf) (TD_TFILE_F(tf)->aname)
|
#define TD_TFILE_FULL_NAME(tf) ((tf)->fname)
|
||||||
#define TD_TFILE_REL_NAME(tf) (TD_TFILE_F(tf)->rname)
|
|
||||||
#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
|
#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL)
|
||||||
#define TD_TFILE_CLOSED(tf) (!TD_TFILE_OPENED(tf))
|
#define TD_TFILE_CLOSED(tf) (!TD_TFILE_OPENED(tf))
|
||||||
#define TD_TFILE_SET_CLOSED(f) (TD_TFILE_PFILE(f) = NULL)
|
#define TD_TFILE_SET_CLOSED(f) (TD_TFILE_PFILE(f) = NULL)
|
||||||
#define TD_TFILE_SET_STATE(tf, s) ((tf)->state = (s))
|
#define TD_TFILE_SET_STATE(tf, s) ((tf)->state = (s))
|
||||||
#define TD_TFILE_DID(tf) (TD_TFILE_F(tf)->did)
|
|
||||||
|
|
||||||
int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname);
|
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname);
|
||||||
int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType);
|
int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType);
|
||||||
int32_t tdOpenTFile(STFile *pTFile, int flags);
|
int32_t tdOpenTFile(STFile *pTFile, int flags);
|
||||||
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte);
|
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte);
|
||||||
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence);
|
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence);
|
||||||
|
@ -244,8 +242,10 @@ int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo);
|
||||||
int32_t tdUpdateTFileHeader(STFile *pTFile);
|
int32_t tdUpdateTFileHeader(STFile *pTFile);
|
||||||
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
|
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
|
||||||
void tdCloseTFile(STFile *pTFile);
|
void tdCloseTFile(STFile *pTFile);
|
||||||
|
void tdDestroyTFile(STFile *pTFile);
|
||||||
|
|
||||||
void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, char *outputName);
|
void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, int64_t version, char *outputName);
|
||||||
|
void tdGetVndDirName(int32_t vgId, const char *dname, char *outputName);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -165,6 +165,9 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool
|
||||||
int32_t smaOpen(SVnode* pVnode);
|
int32_t smaOpen(SVnode* pVnode);
|
||||||
int32_t smaCloseEnv(SSma* pSma);
|
int32_t smaCloseEnv(SSma* pSma);
|
||||||
int32_t smaCloseEx(SSma* pSma);
|
int32_t smaCloseEx(SSma* pSma);
|
||||||
|
int32_t smaPreCommit(SSma* pSma);
|
||||||
|
int32_t smaCommit(SSma* pSma);
|
||||||
|
int32_t smaPostCommit(SSma* pSma);
|
||||||
|
|
||||||
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
||||||
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
||||||
|
|
|
@ -0,0 +1,150 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "sma.h"
|
||||||
|
|
||||||
|
static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma);
|
||||||
|
static int32_t tdProcessRSmaCommitImpl(SSma *pSma);
|
||||||
|
static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Only applicable to Rollup SMA
|
||||||
|
*
|
||||||
|
* @param pSma
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
int32_t smaPreCommit(SSma *pSma) { return tdProcessRSmaPreCommitImpl(pSma); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Only applicable to Rollup SMA
|
||||||
|
*
|
||||||
|
* @param pSma
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
int32_t smaCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Only applicable to Rollup SMA
|
||||||
|
*
|
||||||
|
* @param pSma
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief pre-commit for rollup sma.
|
||||||
|
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
|
||||||
|
* 2) perform persist task for qTaskInfo
|
||||||
|
* 3) wait all triggered fetch tasks finished
|
||||||
|
* 4) set trigger stat of rsma timer TASK_TRIGGER_STAT_ACTIVE.
|
||||||
|
* 5) finish
|
||||||
|
*
|
||||||
|
* @param pSma
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) {
|
||||||
|
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
|
||||||
|
if (!pSmaEnv) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
|
||||||
|
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||||
|
|
||||||
|
// step 1
|
||||||
|
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
|
||||||
|
|
||||||
|
// step 2
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief commit for rollup sma
|
||||||
|
*
|
||||||
|
* @param pSma
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
static int32_t tdProcessRSmaCommitImpl(SSma *pSma) {
|
||||||
|
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
|
||||||
|
if (!pSmaEnv) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief post-commit for rollup sma
|
||||||
|
* 1) clean up the outdated qtaskinfo files
|
||||||
|
*
|
||||||
|
* @param pSma
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) {
|
||||||
|
SVnode *pVnode = pSma->pVnode;
|
||||||
|
|
||||||
|
if (!VND_IS_RSMA(pVnode)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t committed = pVnode->state.committed;
|
||||||
|
TdDirPtr pDir = NULL;
|
||||||
|
TdDirEntryPtr pDirEntry = NULL;
|
||||||
|
char dir[TSDB_FILENAME_LEN];
|
||||||
|
char bname[TSDB_FILENAME_LEN];
|
||||||
|
const char *pattern = "^v[0-9]+qtaskinfo\\.ver([0-9]+)?$";
|
||||||
|
regex_t regex;
|
||||||
|
|
||||||
|
tdGetVndDirName(TD_VID(pVnode), VNODE_RSMA_DIR, dir);
|
||||||
|
|
||||||
|
// Resource allocation and init
|
||||||
|
regcomp(®ex, pattern, REG_EXTENDED);
|
||||||
|
|
||||||
|
if ((pDir = taosOpenDir(dir)) == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
smaWarn("rsma post-commit open dir %s failed since %s", dir, terrstr());
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
regmatch_t regMatch[2];
|
||||||
|
while ((pDirEntry = taosReadDir(pDir)) != NULL) {
|
||||||
|
char *entryName = taosGetDirEntryName(pDirEntry);
|
||||||
|
if (!entryName) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
char *fileName = taosDirEntryBaseName(entryName);
|
||||||
|
int code = regexec(®ex, bname, 2, regMatch, 0);
|
||||||
|
|
||||||
|
if (code == 0) {
|
||||||
|
// match
|
||||||
|
printf("match 0 = %s\n", (char *)POINTER_SHIFT(fileName, regMatch[0].rm_so));
|
||||||
|
printf("match 1 = %s\n", (char *)POINTER_SHIFT(fileName, regMatch[1].rm_so));
|
||||||
|
} else if (code == REG_NOMATCH) {
|
||||||
|
// not match
|
||||||
|
smaInfo("rsma post-commit not match %s", fileName);
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
// has other error
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
smaWarn("rsma post-commit regexec failed since %s", terrstr());
|
||||||
|
|
||||||
|
taosCloseDir(&pDir);
|
||||||
|
regfree(®ex);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosCloseDir(&pDir);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
|
@ -23,8 +23,7 @@ SSmaMgmt smaMgmt = {
|
||||||
.smaRef = -1,
|
.smaRef = -1,
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef enum { TD_QTASK_TMP_F = 0, TD_QTASK_CUR_F } TD_QTASK_FILE_T;
|
#define TD_QTASKINFO_FNAME_PREFIX "qtaskinfo.ver"
|
||||||
static const char *tdQTaskInfoFname[] = {"qtaskinfo.t", "qtaskinfo"};
|
|
||||||
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
|
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
|
||||||
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
|
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
|
||||||
|
|
||||||
|
@ -37,7 +36,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
|
||||||
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
||||||
static void tdRSmaPersistTrigger(void *param, void *tmrId);
|
static void tdRSmaPersistTrigger(void *param, void *tmrId);
|
||||||
static void *tdRSmaPersistExec(void *param);
|
static void *tdRSmaPersistExec(void *param);
|
||||||
static void tdRSmaQTaskInfoGetFName(int32_t vid, int8_t ftype, char *outputName);
|
static void tdRSmaQTaskInfoGetFName(int32_t vid, int64_t version, char *outputName);
|
||||||
|
|
||||||
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
|
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
|
||||||
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
|
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
|
||||||
|
@ -88,8 +87,8 @@ struct SRSmaQTaskInfoIter {
|
||||||
int32_t nBufPos;
|
int32_t nBufPos;
|
||||||
};
|
};
|
||||||
|
|
||||||
static void tdRSmaQTaskInfoGetFName(int32_t vgId, int8_t ftype, char *outputName) {
|
static void tdRSmaQTaskInfoGetFName(int32_t vgId, int64_t version, char *outputName) {
|
||||||
tdGetVndFileName(vgId, VNODE_RSMA_DIR, tdQTaskInfoFname[ftype], outputName);
|
tdGetVndFileName(vgId, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
|
static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
|
||||||
|
@ -493,7 +492,6 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
|
static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
|
||||||
ASSERT(pMsg != NULL);
|
|
||||||
SSubmitMsgIter msgIter = {0};
|
SSubmitMsgIter msgIter = {0};
|
||||||
SSubmitBlk *pBlock = NULL;
|
SSubmitBlk *pBlock = NULL;
|
||||||
SSubmitBlkIter blkIter = {0};
|
SSubmitBlkIter blkIter = {0};
|
||||||
|
@ -501,19 +499,26 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
while (true) {
|
while (true) {
|
||||||
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (!pBlock) break;
|
if (!pBlock) break;
|
||||||
tdUidStorePut(pStore, msgIter.suid, NULL);
|
tdUidStorePut(pStore, msgIter.suid, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
if (terrno != TSDB_CODE_SUCCESS) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tdDestroySDataBlockArray(SArray *pArray) {
|
static void tdDestroySDataBlockArray(SArray *pArray) {
|
||||||
|
// TODO
|
||||||
#if 0
|
#if 0
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||||
SSDataBlock *pDataBlock = taosArrayGet(pArray, i);
|
SSDataBlock *pDataBlock = taosArrayGet(pArray, i);
|
||||||
|
@ -598,33 +603,54 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
|
|
||||||
pSma = RSMA_INFO_SMA(pItem->pRsmaInfo);
|
pSma = RSMA_INFO_SMA(pItem->pRsmaInfo);
|
||||||
|
|
||||||
// if rsma trigger stat in cancelled or finished, not start fetch task anymore
|
// if rsma trigger stat in paused, cancelled or finished, not start fetch task
|
||||||
int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
|
int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
|
||||||
if (rsmaTriggerStat == TASK_TRIGGER_STAT_CANCELLED || rsmaTriggerStat == TASK_TRIGGER_STAT_FINISHED) {
|
switch (rsmaTriggerStat) {
|
||||||
taosReleaseRef(smaMgmt.smaRef, pItem->refId);
|
case TASK_TRIGGER_STAT_PAUSED:
|
||||||
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is cancelled",
|
case TASK_TRIGGER_STAT_CANCELLED:
|
||||||
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
|
case TASK_TRIGGER_STAT_FINISHED: {
|
||||||
return;
|
taosReleaseRef(smaMgmt.smaRef, pItem->refId);
|
||||||
|
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is cancelled",
|
||||||
|
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
int8_t fetchTriggerStat =
|
int8_t fetchTriggerStat =
|
||||||
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
|
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
|
||||||
if (fetchTriggerStat == TASK_TRIGGER_STAT_ACTIVE) {
|
switch (fetchTriggerStat) {
|
||||||
smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma),
|
case TASK_TRIGGER_STAT_ACTIVE: {
|
||||||
pItem->level, pItem->pRsmaInfo->suid);
|
smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma),
|
||||||
|
pItem->level, pItem->pRsmaInfo->suid);
|
||||||
|
|
||||||
tdRefSmaStat(pSma, (SSmaStat *)pStat);
|
tdRefSmaStat(pSma, (SSmaStat *)pStat);
|
||||||
|
|
||||||
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
||||||
qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_INPUT__DATA_BLOCK, false);
|
qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_INPUT__DATA_BLOCK, false);
|
||||||
tdFetchAndSubmitRSmaResult(pItem, STREAM_INPUT__DATA_BLOCK);
|
tdFetchAndSubmitRSmaResult(pItem, STREAM_INPUT__DATA_BLOCK);
|
||||||
|
|
||||||
tdUnRefSmaStat(pSma, (SSmaStat *)pStat);
|
tdUnRefSmaStat(pSma, (SSmaStat *)pStat);
|
||||||
|
} break;
|
||||||
} else {
|
case TASK_TRIGGER_STAT_PAUSED: {
|
||||||
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive",
|
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused",
|
||||||
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
|
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
|
||||||
|
} break;
|
||||||
|
case TASK_TRIGGER_STAT_INACTIVE: {
|
||||||
|
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive",
|
||||||
|
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
|
||||||
|
} break;
|
||||||
|
case TASK_TRIGGER_STAT_INIT: {
|
||||||
|
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is init", SMA_VID(pSma),
|
||||||
|
pItem->level, pItem->pRsmaInfo->suid);
|
||||||
|
} break;
|
||||||
|
default: {
|
||||||
|
smaWarn("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is unknown",
|
||||||
|
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
|
||||||
|
} break;
|
||||||
}
|
}
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
taosReleaseRef(smaMgmt.smaRef, pItem->refId);
|
taosReleaseRef(smaMgmt.smaRef, pItem->refId);
|
||||||
}
|
}
|
||||||
|
@ -780,10 +806,10 @@ _err:
|
||||||
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) {
|
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) {
|
||||||
SVnode *pVnode = pSma->pVnode;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
STFile tFile = {0};
|
STFile tFile = {0};
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
char qTaskInfoFName[TSDB_FILENAME_LEN] = {0};
|
||||||
|
|
||||||
tdRSmaQTaskInfoGetFName(TD_VID(pVnode), TD_QTASK_TMP_F, qTaskInfoFName);
|
tdRSmaQTaskInfoGetFName(TD_VID(pVnode), pVnode->state.committed, qTaskInfoFName);
|
||||||
if (tdInitTFile(&tFile, pVnode->pTfs, qTaskInfoFName) < 0) {
|
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -799,17 +825,20 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) {
|
||||||
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
|
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
|
||||||
tdRSmaQTaskInfoIterDestroy(&fIter);
|
tdRSmaQTaskInfoIterDestroy(&fIter);
|
||||||
tdCloseTFile(&tFile);
|
tdCloseTFile(&tFile);
|
||||||
|
tdDestroyTFile(&tFile);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdRSmaQTaskInfoRestore(pSma, &fIter) < 0) {
|
if (tdRSmaQTaskInfoRestore(pSma, &fIter) < 0) {
|
||||||
tdRSmaQTaskInfoIterDestroy(&fIter);
|
tdRSmaQTaskInfoIterDestroy(&fIter);
|
||||||
tdCloseTFile(&tFile);
|
tdCloseTFile(&tFile);
|
||||||
|
tdDestroyTFile(&tFile);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
tdRSmaQTaskInfoIterDestroy(&fIter);
|
tdRSmaQTaskInfoIterDestroy(&fIter);
|
||||||
tdCloseTFile(&tFile);
|
tdCloseTFile(&tFile);
|
||||||
|
tdDestroyTFile(&tFile);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
_err:
|
_err:
|
||||||
smaError("rsma restore, qtaskinfo reload failed since %s", terrstr());
|
smaError("rsma restore, qtaskinfo reload failed since %s", terrstr());
|
||||||
|
@ -931,19 +960,21 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) {
|
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) {
|
||||||
ASSERT(0);
|
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdReadTFile(pTFile, pIter->qBuf, nBytes) != nBytes) {
|
if (tdReadTFile(pTFile, pIter->qBuf, nBytes) != nBytes) {
|
||||||
ASSERT(0);
|
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t infoLen = 0;
|
int32_t infoLen = 0;
|
||||||
taosDecodeFixedI32(pIter->qBuf, &infoLen);
|
taosDecodeFixedI32(pIter->qBuf, &infoLen);
|
||||||
if (infoLen > nBytes) {
|
if (infoLen > nBytes) {
|
||||||
ASSERT(infoLen > RSMA_QTASKINFO_BUFSIZE);
|
if (infoLen <= RSMA_QTASKINFO_BUFSIZE) {
|
||||||
|
terrno = TSDB_CODE_RSMA_FILE_CORRUPTED;
|
||||||
|
smaError("iterate rsma qtaskinfo file %s failed since %s", TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
pIter->nAlloc = infoLen;
|
pIter->nAlloc = infoLen;
|
||||||
void *pBuf = taosMemoryRealloc(pIter->pBuf, infoLen);
|
void *pBuf = taosMemoryRealloc(pIter->pBuf, infoLen);
|
||||||
if (!pBuf) {
|
if (!pBuf) {
|
||||||
|
@ -955,12 +986,10 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF
|
||||||
nBytes = infoLen;
|
nBytes = infoLen;
|
||||||
|
|
||||||
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET)) {
|
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET)) {
|
||||||
ASSERT(0);
|
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) {
|
if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) {
|
||||||
ASSERT(0);
|
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -977,7 +1006,6 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
|
||||||
// block iter
|
// block iter
|
||||||
bool isFinish = false;
|
bool isFinish = false;
|
||||||
if (tdRSmaQTaskInfoIterNextBlock(pIter, &isFinish) < 0) {
|
if (tdRSmaQTaskInfoIterNextBlock(pIter, &isFinish) < 0) {
|
||||||
ASSERT(0);
|
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
if (isFinish) {
|
if (isFinish) {
|
||||||
|
@ -989,6 +1017,7 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
|
||||||
pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
|
pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
|
||||||
if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) {
|
if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) {
|
||||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||||
|
smaError("restore rsma qtaskinfo file %s failed since %s", TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1025,22 +1054,16 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *tdRSmaPersistExec(void *param) {
|
static int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) {
|
||||||
setThreadName("rsma-task-persist");
|
SSma *pSma = pRSmaStat->pSma;
|
||||||
SRSmaStat *pRSmaStat = param;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
SSma *pSma = pRSmaStat->pSma;
|
int32_t vid = SMA_VID(pSma);
|
||||||
STfs *pTfs = pSma->pVnode->pTfs;
|
int64_t toffset = 0;
|
||||||
int32_t vid = SMA_VID(pSma);
|
bool isFileCreated = false;
|
||||||
int64_t toffset = 0;
|
|
||||||
bool isFileCreated = false;
|
|
||||||
|
|
||||||
if (TASK_TRIGGER_STAT_CANCELLED == atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))) {
|
|
||||||
goto _end;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
|
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
|
||||||
if (!infoHash) {
|
if (!infoHash) {
|
||||||
goto _end;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
STFile tFile = {0};
|
STFile tFile = {0};
|
||||||
|
@ -1074,9 +1097,13 @@ static void *tdRSmaPersistExec(void *param) {
|
||||||
|
|
||||||
if (!isFileCreated) {
|
if (!isFileCreated) {
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
||||||
tdRSmaQTaskInfoGetFName(vid, TD_QTASK_TMP_F, qTaskInfoFName);
|
tdRSmaQTaskInfoGetFName(vid, pSma->pVnode->state.applied, qTaskInfoFName);
|
||||||
tdInitTFile(&tFile, pTfs, qTaskInfoFName);
|
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
||||||
tdCreateTFile(&tFile, pTfs, true, -1);
|
goto _err;
|
||||||
|
}
|
||||||
|
if (tdCreateTFile(&tFile, true, -1) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
isFileCreated = true;
|
isFileCreated = true;
|
||||||
}
|
}
|
||||||
|
@ -1101,49 +1128,55 @@ static void *tdRSmaPersistExec(void *param) {
|
||||||
|
|
||||||
infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), infoHash);
|
infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), infoHash);
|
||||||
}
|
}
|
||||||
_normal:
|
|
||||||
if (isFileCreated) {
|
if (isFileCreated) {
|
||||||
if (tdUpdateTFileHeader(&tFile) < 0) {
|
if (tdUpdateTFileHeader(&tFile) < 0) {
|
||||||
smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
|
smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
tdCloseTFile(&tFile);
|
|
||||||
tdRemoveTFile(&tFile);
|
|
||||||
goto _err;
|
goto _err;
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, rsma, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile));
|
smaDebug("vgId:%d, rsma, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile));
|
||||||
}
|
}
|
||||||
|
|
||||||
tdCloseTFile(&tFile);
|
tdCloseTFile(&tFile);
|
||||||
|
tdDestroyTFile(&tFile);
|
||||||
char newFName[TSDB_FILENAME_LEN];
|
|
||||||
strncpy(newFName, TD_TFILE_FULL_NAME(&tFile), TSDB_FILENAME_LEN);
|
|
||||||
char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_F]);
|
|
||||||
strncpy(pos, tdQTaskInfoFname[TD_QTASK_TMP_F], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName));
|
|
||||||
if (taosRenameFile(TD_TFILE_FULL_NAME(&tFile), newFName) != 0) {
|
|
||||||
smaError("vgId:%d, rsma, failed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName);
|
|
||||||
goto _err;
|
|
||||||
} else {
|
|
||||||
smaDebug("vgId:%d, rsma, succeed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
goto _end;
|
return TSDB_CODE_SUCCESS;
|
||||||
_err:
|
_err:
|
||||||
if (isFileCreated) {
|
if (isFileCreated) {
|
||||||
tdRemoveTFile(&tFile);
|
tdRemoveTFile(&tFile);
|
||||||
|
tdDestroyTFile(&tFile);
|
||||||
}
|
}
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *tdRSmaPersistExec(void *param) {
|
||||||
|
setThreadName("rsma-task-persist");
|
||||||
|
SRSmaStat *pRSmaStat = param;
|
||||||
|
SSma *pSma = pRSmaStat->pSma;
|
||||||
|
|
||||||
|
int8_t triggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat));
|
||||||
|
|
||||||
|
if (TASK_TRIGGER_STAT_CANCELLED == triggerStat || TASK_TRIGGER_STAT_PAUSED == triggerStat) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
// execution
|
||||||
|
tdRSmaPersistExecImpl(pRSmaStat);
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
||||||
TASK_TRIGGER_STAT_INACTIVE,
|
TASK_TRIGGER_STAT_INACTIVE,
|
||||||
TASK_TRIGGER_STAT_ACTIVE)) {
|
TASK_TRIGGER_STAT_ACTIVE)) {
|
||||||
smaDebug("vgId:%d, rsma persist task is active again", vid);
|
smaDebug("vgId:%d, rsma persist task is active again", SMA_VID(pSma));
|
||||||
} else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
} else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
||||||
TASK_TRIGGER_STAT_CANCELLED,
|
TASK_TRIGGER_STAT_CANCELLED,
|
||||||
TASK_TRIGGER_STAT_FINISHED)) {
|
TASK_TRIGGER_STAT_FINISHED)) {
|
||||||
smaDebug("vgId:%d, rsma persist task is cancelled", vid);
|
smaDebug("vgId:%d, rsma persist task is cancelled", SMA_VID(pSma));
|
||||||
} else {
|
} else {
|
||||||
smaWarn("vgId:%d, rsma persist task in abnormal stat %" PRIi8, vid, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)));
|
smaWarn("vgId:%d, rsma persist task in stat %" PRIi8, SMA_VID(pSma), atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)));
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
|
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
|
||||||
taosReleaseRef(smaMgmt.smaRef, pRSmaStat->refId);
|
taosReleaseRef(smaMgmt.smaRef, pRSmaStat->refId);
|
||||||
taosThreadExit(NULL);
|
taosThreadExit(NULL);
|
||||||
|
@ -1166,8 +1199,8 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
|
||||||
TASK_TRIGGER_STAT_FINISHED)) {
|
TASK_TRIGGER_STAT_FINISHED)) {
|
||||||
smaDebug("vgId:%d, persist task is cancelled and set finished", SMA_VID(pRSmaStat->pSma));
|
smaDebug("vgId:%d, persist task is cancelled and set finished", SMA_VID(pRSmaStat->pSma));
|
||||||
} else {
|
} else {
|
||||||
smaWarn("vgId:%d, persist task in abnormal stat %" PRIi8, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)),
|
smaWarn("vgId:%d, persist task in abnormal stat %" PRIi8, SMA_VID(pRSmaStat->pSma),
|
||||||
SMA_VID(pRSmaStat->pSma));
|
atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)));
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
|
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
|
||||||
|
@ -1216,6 +1249,9 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
|
||||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_FINISHED);
|
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_FINISHED);
|
||||||
smaDebug("rsma persistence not start since cancelled and finished");
|
smaDebug("rsma persistence not start since cancelled and finished");
|
||||||
} break;
|
} break;
|
||||||
|
case TASK_TRIGGER_STAT_PAUSED: {
|
||||||
|
smaDebug("rsma persistence not start since paused");
|
||||||
|
} break;
|
||||||
case TASK_TRIGGER_STAT_INACTIVE: {
|
case TASK_TRIGGER_STAT_INACTIVE: {
|
||||||
smaDebug("rsma persistence not start since inactive");
|
smaDebug("rsma persistence not start since inactive");
|
||||||
} break;
|
} break;
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "sma.h"
|
|
@ -179,72 +179,83 @@ void tdCloseTFile(STFile *pTFile) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, char *outputName) {
|
void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTFile)); }
|
||||||
snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/%s", vgId, dname, fname);
|
|
||||||
|
void tdGetVndFileName(int32_t vgId, const char *dname, const char *fname, int64_t version, char *outputName) {
|
||||||
|
if (version < 0) {
|
||||||
|
snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/v%d%s", vgId, dname, vgId, fname);
|
||||||
|
} else {
|
||||||
|
snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/v%d%s%" PRIi64, vgId, dname, vgId, fname, version);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) {
|
void tdGetVndDirName(int32_t vgId, const char *dname, char *outputName) {
|
||||||
char fullname[TSDB_FILENAME_LEN];
|
snprintf(outputName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s", vgId, dname);
|
||||||
SDiskID did = {0};
|
}
|
||||||
|
|
||||||
|
int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) {
|
||||||
TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK);
|
TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK);
|
||||||
TD_TFILE_SET_CLOSED(pTFile);
|
TD_TFILE_SET_CLOSED(pTFile);
|
||||||
|
|
||||||
memset(&(pTFile->info), 0, sizeof(pTFile->info));
|
memset(&(pTFile->info), 0, sizeof(pTFile->info));
|
||||||
pTFile->info.magic = TD_FILE_INIT_MAGIC;
|
pTFile->info.magic = TD_FILE_INIT_MAGIC;
|
||||||
|
|
||||||
if (tfsAllocDisk(pTfs, 0, &did) < 0) {
|
char tmpName[TSDB_FILENAME_LEN * 2 + 32] = {0};
|
||||||
terrno = TSDB_CODE_NO_AVAIL_DISK;
|
snprintf(tmpName, TSDB_FILENAME_LEN * 2 + 32, "%s%s%s", dname, TD_DIRSEP, fname);
|
||||||
|
int32_t tmpNameLen = strlen(tmpName) + 1;
|
||||||
|
pTFile->fname = taosMemoryMalloc(tmpNameLen);
|
||||||
|
if (!pTFile->fname) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
tstrncpy(pTFile->fname, tmpName, tmpNameLen);
|
||||||
tfsInitFile(pTfs, &(pTFile->f), did, fname);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType) {
|
int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType) {
|
||||||
ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC);
|
ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC);
|
||||||
|
|
||||||
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||||
if (pTFile->pFile == NULL) {
|
if (pTFile->pFile == NULL) {
|
||||||
if (errno == ENOENT) {
|
if (errno == ENOENT) {
|
||||||
// Try to create directory recursively
|
// Try to create directory recursively
|
||||||
char *s = strdup(TD_TFILE_REL_NAME(pTFile));
|
if (taosMulMkDir(taosDirName(TD_TFILE_FULL_NAME(pTFile))) != 0) {
|
||||||
if (tfsMkdirRecurAt(pTfs, taosDirName(s), TD_TFILE_DID(pTFile)) < 0) {
|
|
||||||
taosMemoryFreeClear(s);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taosMemoryFreeClear(s);
|
|
||||||
|
|
||||||
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
|
||||||
if (pTFile->pFile == NULL) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
|
} else {
|
||||||
|
pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||||
|
if (pTFile->pFile == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
|
if (!updateHeader) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTFile->info.fsize += TD_FILE_HEAD_SIZE;
|
||||||
|
pTFile->info.fver = 0;
|
||||||
|
|
||||||
|
if (tdUpdateTFileHeader(pTFile) < 0) {
|
||||||
|
tdCloseTFile(pTFile);
|
||||||
|
tdRemoveTFile(pTFile);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!updateHeader) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTFile->info.fsize += TD_FILE_HEAD_SIZE;
|
|
||||||
pTFile->info.fver = 0;
|
|
||||||
|
|
||||||
if (tdUpdateTFileHeader(pTFile) < 0) {
|
|
||||||
tdCloseTFile(pTFile);
|
|
||||||
tdRemoveTFile(pTFile);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tdRemoveTFile(STFile *pTFile) { return tfsRemoveFile(TD_TFILE_F(pTFile)); }
|
int32_t tdRemoveTFile(STFile *pTFile) {
|
||||||
|
if (taosRemoveFile(TD_TFILE_FULL_NAME(pTFile)) != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
};
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// smaXXXUtil ================
|
// smaXXXUtil ================
|
||||||
// ...
|
// ...
|
|
@ -183,13 +183,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
|
/*STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);*/
|
||||||
if (pOffset == NULL || pOffset->val.version < offset.val.version) {
|
/*if (pOffset != NULL) {*/
|
||||||
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
|
/*if (pOffset->val.type == TMQ_OFFSET__LOG && pOffset->val.version < offset.val.version) {*/
|
||||||
ASSERT(0);
|
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
|
||||||
return -1;
|
ASSERT(0);
|
||||||
}
|
return -1;
|
||||||
}
|
}
|
||||||
|
/*}*/
|
||||||
|
/*}*/
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -375,8 +377,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
|
|
||||||
taosMemoryFree(pCkHead);
|
taosMemoryFree(pCkHead);
|
||||||
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
tqInfo("retrieve using snapshot req offset: uid %ld ts %ld, actual offset: uid %ld ts %ld", dataRsp.reqOffset.uid,
|
tqInfo("retrieve using snapshot actual offset: uid %ld ts %ld", fetchOffsetNew.uid, fetchOffsetNew.ts);
|
||||||
dataRsp.reqOffset.ts, fetchOffsetNew.uid, fetchOffsetNew.ts);
|
|
||||||
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
|
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,7 +120,9 @@ bool tqNextDataBlock(SStreamReader* pHandle) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t));
|
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t));
|
||||||
|
/*tqDebug("search uid %ld", pHandle->msgIter.uid);*/
|
||||||
if (ret != NULL) {
|
if (ret != NULL) {
|
||||||
|
/*tqDebug("find uid %ld", pHandle->msgIter.uid);*/
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,12 +28,12 @@ const SVnodeCfg vnodeCfgDefault = {
|
||||||
.update = 1,
|
.update = 1,
|
||||||
.compression = 2,
|
.compression = 2,
|
||||||
.slLevel = 5,
|
.slLevel = 5,
|
||||||
.days = 10,
|
.days = 14400,
|
||||||
.minRows = 100,
|
.minRows = 100,
|
||||||
.maxRows = 4096,
|
.maxRows = 4096,
|
||||||
.keep2 = 3650,
|
.keep2 = 5256000,
|
||||||
.keep0 = 3650,
|
.keep0 = 5256000,
|
||||||
.keep1 = 3650},
|
.keep1 = 5256000},
|
||||||
.walCfg =
|
.walCfg =
|
||||||
{.vgId = -1, .fsyncPeriod = 0, .retentionPeriod = 0, .rollPeriod = 0, .segSize = 0, .level = TAOS_WAL_WRITE},
|
{.vgId = -1, .fsyncPeriod = 0, .retentionPeriod = 0, .rollPeriod = 0, .segSize = 0, .level = TAOS_WAL_WRITE},
|
||||||
.hashBegin = 0,
|
.hashBegin = 0,
|
||||||
|
|
|
@ -229,6 +229,9 @@ int vnodeCommit(SVnode *pVnode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// preCommit
|
||||||
|
// TODO
|
||||||
|
|
||||||
// commit each sub-system
|
// commit each sub-system
|
||||||
if (metaCommit(pVnode->pMeta) < 0) {
|
if (metaCommit(pVnode->pMeta) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -269,6 +272,9 @@ int vnodeCommit(SVnode *pVnode) {
|
||||||
|
|
||||||
pVnode->state.committed = info.state.committed;
|
pVnode->state.committed = info.state.committed;
|
||||||
|
|
||||||
|
// postCommit
|
||||||
|
smaPostCommit(pVnode->pSma);
|
||||||
|
|
||||||
// apply the commit (TODO)
|
// apply the commit (TODO)
|
||||||
vnodeBufPoolReset(pVnode->onCommit);
|
vnodeBufPoolReset(pVnode->onCommit);
|
||||||
pVnode->onCommit->next = pVnode->pPool;
|
pVnode->onCommit->next = pVnode->pPool;
|
||||||
|
|
|
@ -779,6 +779,7 @@ _exit:
|
||||||
taosArrayDestroy(submitRsp.pArray);
|
taosArrayDestroy(submitRsp.pArray);
|
||||||
|
|
||||||
// TODO: the partial success scenario and the error case
|
// TODO: the partial success scenario and the error case
|
||||||
|
// => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level 1/level 2.
|
||||||
// TODO: refactor
|
// TODO: refactor
|
||||||
if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
|
if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
|
||||||
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
|
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
|
||||||
|
|
|
@ -356,6 +356,7 @@ typedef struct SStreamBlockScanInfo {
|
||||||
SUpdateInfo* pUpdateInfo;
|
SUpdateInfo* pUpdateInfo;
|
||||||
|
|
||||||
EStreamScanMode scanMode;
|
EStreamScanMode scanMode;
|
||||||
|
SOperatorInfo* pStreamScanOp;
|
||||||
SOperatorInfo* pSnapshotReadOp;
|
SOperatorInfo* pSnapshotReadOp;
|
||||||
SArray* childIds;
|
SArray* childIds;
|
||||||
SessionWindowSupporter sessionSup;
|
SessionWindowSupporter sessionSup;
|
||||||
|
|
|
@ -145,10 +145,12 @@ static SArray* filterQualifiedChildTables(const SStreamBlockScanInfo* pScanInfo,
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO handle ntb case
|
||||||
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) {
|
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// TODO handle ntb case
|
/*pScanInfo->pStreamScanOp->pTaskInfo->tableqinfoList.*/
|
||||||
|
// handle multiple partition
|
||||||
|
|
||||||
taosArrayPush(qa, id);
|
taosArrayPush(qa, id);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2027,8 +2027,9 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
|
||||||
|
|
||||||
ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);
|
ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);
|
||||||
|
|
||||||
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, GET_TASKID(pTaskInfo),
|
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
|
||||||
pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId, sourceIndex, totalSources);
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId,
|
||||||
|
sourceIndex, totalSources);
|
||||||
|
|
||||||
pMsg->header.vgId = htonl(pSource->addr.nodeId);
|
pMsg->header.vgId = htonl(pSource->addr.nodeId);
|
||||||
pMsg->sId = htobe64(pSource->schedId);
|
pMsg->sId = htobe64(pSource->schedId);
|
||||||
|
@ -2163,8 +2164,8 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
|
||||||
SSDataBlock* pRes = pExchangeInfo->pResult;
|
SSDataBlock* pRes = pExchangeInfo->pResult;
|
||||||
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||||
if (pRsp->numOfRows == 0) {
|
if (pRsp->numOfRows == 0) {
|
||||||
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
|
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
|
||||||
", completed:%d try next %d/%" PRIzu,
|
", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
|
||||||
pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
|
pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
|
||||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||||
|
@ -2183,18 +2184,19 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRsp->completed == 1) {
|
if (pRsp->completed == 1) {
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d"
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
|
||||||
|
" execId:%d"
|
||||||
" index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64
|
" index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64
|
||||||
", completed:%d try next %d/%" PRIzu,
|
", completed:%d try next %d/%" PRIzu,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRes->info.rows, pDataInfo->totalRows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRes->info.rows,
|
||||||
pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources);
|
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources);
|
||||||
completed += 1;
|
completed += 1;
|
||||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||||
} else {
|
} else {
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
|
||||||
", totalBytes:%" PRIu64,
|
", totalBytes:%" PRIu64,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows, pLoadInfo->totalRows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
|
||||||
pLoadInfo->totalSize);
|
pLoadInfo->totalRows, pLoadInfo->totalSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFreeClear(pDataInfo->pRsp);
|
taosMemoryFreeClear(pDataInfo->pRsp);
|
||||||
|
@ -2267,8 +2269,8 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
|
||||||
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
|
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
|
||||||
|
|
||||||
if (pDataInfo->code != TSDB_CODE_SUCCESS) {
|
if (pDataInfo->code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo), pSource->addr.nodeId,
|
qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo),
|
||||||
pSource->taskId, pSource->execId, tstrerror(pDataInfo->code));
|
pSource->addr.nodeId, pSource->taskId, pSource->execId, tstrerror(pDataInfo->code));
|
||||||
pOperator->pTaskInfo->code = pDataInfo->code;
|
pOperator->pTaskInfo->code = pDataInfo->code;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -2276,8 +2278,8 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
|
||||||
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
|
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
|
||||||
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||||
if (pRsp->numOfRows == 0) {
|
if (pRsp->numOfRows == 0) {
|
||||||
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
|
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64
|
||||||
" try next",
|
", totalRows:%" PRIu64 " try next",
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
|
||||||
pDataInfo->totalRows, pLoadInfo->totalRows);
|
pDataInfo->totalRows, pLoadInfo->totalRows);
|
||||||
|
|
||||||
|
@ -2296,16 +2298,17 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
|
||||||
if (pRsp->completed == 1) {
|
if (pRsp->completed == 1) {
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
|
||||||
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows, pDataInfo->totalRows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
|
||||||
pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources);
|
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
|
||||||
|
totalSources);
|
||||||
|
|
||||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||||
pExchangeInfo->current += 1;
|
pExchangeInfo->current += 1;
|
||||||
} else {
|
} else {
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
|
||||||
", totalBytes:%" PRIu64,
|
", totalBytes:%" PRIu64,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows, pLoadInfo->totalRows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
|
||||||
pLoadInfo->totalSize);
|
pLoadInfo->totalRows, pLoadInfo->totalSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->resultInfo.totalRows += pRes->info.rows;
|
pOperator->resultInfo.totalRows += pRes->info.rows;
|
||||||
|
|
|
@ -1274,6 +1274,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
|
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
|
||||||
pInfo->groupId = 0;
|
pInfo->groupId = 0;
|
||||||
pInfo->pPullDataRes = createPullDataBlock();
|
pInfo->pPullDataRes = createPullDataBlock();
|
||||||
|
pInfo->pStreamScanOp = pOperator;
|
||||||
|
|
||||||
pOperator->name = "StreamBlockScanOperator";
|
pOperator->name = "StreamBlockScanOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
||||||
|
|
|
@ -276,8 +276,6 @@ extern SSchedulerMgmt schMgmt;
|
||||||
|
|
||||||
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
|
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
|
||||||
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
|
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
|
||||||
#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0)
|
|
||||||
#define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1)
|
|
||||||
|
|
||||||
#define SCH_IS_DATA_SRC_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
|
#define SCH_IS_DATA_SRC_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
|
||||||
#define SCH_IS_DATA_SRC_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
|
#define SCH_IS_DATA_SRC_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
|
||||||
|
@ -309,7 +307,10 @@ extern SSchedulerMgmt schMgmt;
|
||||||
#define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job))
|
#define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job))
|
||||||
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
|
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
|
||||||
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
|
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
|
||||||
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (((_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_BROKEN_LINK) && ((_len) > 0))
|
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0))
|
||||||
|
#define SCH_NEED_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH)
|
||||||
|
#define SCH_NEED_REDIRECT(_msgType, _code, _rspLen) (SCH_NEED_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_SUB_TASK_NETWORK_ERR(_code, _rspLen)))
|
||||||
|
#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_NEED_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
|
||||||
|
|
||||||
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
|
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
|
||||||
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
|
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
|
||||||
|
|
|
@ -835,7 +835,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!NEED_SCHEDULER_RETRY_ERROR(errCode)) {
|
if (!SCH_NEED_RETRY(pTask->lastMsgType, errCode)) {
|
||||||
*needRetry = false;
|
*needRetry = false;
|
||||||
SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
|
SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
|
|
||||||
|
|
||||||
int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
|
int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
|
||||||
int32_t lastMsgType = SCH_GET_TASK_LASTMSG_TYPE(pTask);
|
int32_t lastMsgType = pTask->lastMsgType;
|
||||||
int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
|
int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
|
||||||
int32_t reqMsgType = msgType - 1;
|
int32_t reqMsgType = msgType - 1;
|
||||||
switch (msgType) {
|
switch (msgType) {
|
||||||
|
@ -42,7 +42,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
|
||||||
TMSG_INFO(msgType));
|
TMSG_INFO(msgType));
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
|
//SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
case TDMT_SCH_FETCH_RSP:
|
case TDMT_SCH_FETCH_RSP:
|
||||||
if (lastMsgType != reqMsgType && -1 != lastMsgType) {
|
if (lastMsgType != reqMsgType && -1 != lastMsgType) {
|
||||||
|
@ -57,7 +57,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
|
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
|
//SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
case TDMT_VND_CREATE_TABLE_RSP:
|
case TDMT_VND_CREATE_TABLE_RSP:
|
||||||
case TDMT_VND_DROP_TABLE_RSP:
|
case TDMT_VND_DROP_TABLE_RSP:
|
||||||
|
@ -82,7 +82,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
|
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
|
//SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -396,7 +396,8 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
|
||||||
|
|
||||||
SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType));
|
SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType));
|
||||||
|
|
||||||
if (NEED_SCHEDULER_REDIRECT_ERROR(rspCode) || SCH_SUB_TASK_NETWORK_ERR(rspCode, pMsg->len > 0)) {
|
int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1);
|
||||||
|
if (SCH_NEED_REDIRECT(reqType, rspCode, pMsg->len)) {
|
||||||
code = schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode);
|
code = schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode);
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
@ -855,6 +856,9 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
|
||||||
addr->nodeId, epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port,
|
addr->nodeId, epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port,
|
||||||
trans->pTrans, trans->pHandle);
|
trans->pTrans, trans->pHandle);
|
||||||
|
|
||||||
|
if (pTask) {
|
||||||
|
pTask->lastMsgType = msgType;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
|
code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
|
||||||
|
@ -1098,8 +1102,6 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType);
|
|
||||||
|
|
||||||
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
|
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
|
||||||
SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle,
|
SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle,
|
||||||
(rpcCtx.args ? &rpcCtx : NULL)));
|
(rpcCtx.args ? &rpcCtx : NULL)));
|
||||||
|
@ -1112,7 +1114,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
|
pTask->lastMsgType = -1;
|
||||||
schFreeRpcCtx(&rpcCtx);
|
schFreeRpcCtx(&rpcCtx);
|
||||||
|
|
||||||
taosMemoryFreeClear(msg);
|
taosMemoryFreeClear(msg);
|
||||||
|
|
|
@ -590,6 +590,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in ca
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
|
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
|
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FILE_CORRUPTED, "Rsma file corrupted")
|
||||||
|
|
||||||
|
|
||||||
//tq
|
//tq
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset")
|
||||||
|
|
|
@ -662,6 +662,12 @@ class TDCom:
|
||||||
return res_list
|
return res_list
|
||||||
else:
|
else:
|
||||||
tdLog.exit(f"getOneRow out of range: row_index={location} row_count={self.query_row}")
|
tdLog.exit(f"getOneRow out of range: row_index={location} row_count={self.query_row}")
|
||||||
|
|
||||||
|
def killProcessor(self, processorName):
|
||||||
|
if (platform.system().lower() == 'windows'):
|
||||||
|
os.system("TASKKILL /F /IM %s.exe"%processorName)
|
||||||
|
else:
|
||||||
|
os.system('pkill %s'%processorName)
|
||||||
|
|
||||||
|
|
||||||
def is_json(msg):
|
def is_json(msg):
|
||||||
|
|
|
@ -25,7 +25,7 @@ $rowsPerCtb = 10
|
||||||
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||||
#---- global parameters end ----#
|
#---- global parameters end ----#
|
||||||
|
|
||||||
$pullDelay = 5
|
$pullDelay = 2
|
||||||
$ifcheckdata = 1
|
$ifcheckdata = 1
|
||||||
$ifmanualcommit = 1
|
$ifmanualcommit = 1
|
||||||
$showMsg = 1
|
$showMsg = 1
|
||||||
|
|
|
@ -355,40 +355,6 @@ class TDTestCase:
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
def test_case4(self):
|
|
||||||
self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 2, 1*10)
|
|
||||||
tdSql.execute("use db1;")
|
|
||||||
tdSql.query("show dnodes;")
|
|
||||||
dnodeId=tdSql.getData(0,0)
|
|
||||||
print(dnodeId)
|
|
||||||
tdSql.execute("create qnode on dnode %s"%dnodeId)
|
|
||||||
tdSql.query("select max(c1) from stb10;")
|
|
||||||
maxQnode=tdSql.getData(0,0)
|
|
||||||
tdSql.query("select min(c1) from stb11;")
|
|
||||||
minQnode=tdSql.getData(0,0)
|
|
||||||
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;")
|
|
||||||
unionQnode=tdSql.queryResult
|
|
||||||
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;")
|
|
||||||
unionallQnode=tdSql.queryResult
|
|
||||||
|
|
||||||
# tdSql.query("show qnodes;")
|
|
||||||
# qnodeId=tdSql.getData(0,0)
|
|
||||||
tdSql.execute("drop qnode on dnode %s"%dnodeId)
|
|
||||||
tdSql.execute("reset query cache")
|
|
||||||
tdSql.query("select max(c1) from stb10;")
|
|
||||||
tdSql.checkData(0, 0, "%s"%maxQnode)
|
|
||||||
tdSql.query("select min(c1) from stb11;")
|
|
||||||
tdSql.checkData(0, 0, "%s"%minQnode)
|
|
||||||
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;")
|
|
||||||
unionVnode=tdSql.queryResult
|
|
||||||
assert unionQnode == unionVnode
|
|
||||||
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;")
|
|
||||||
unionallVnode=tdSql.queryResult
|
|
||||||
assert unionallQnode == unionallVnode
|
|
||||||
|
|
||||||
|
|
||||||
# tdSql.execute("create qnode on dnode %s"%dnodeId)
|
|
||||||
|
|
||||||
# run case
|
# run case
|
||||||
def run(self):
|
def run(self):
|
||||||
|
|
||||||
|
|
|
@ -82,7 +82,7 @@ class TDTestCase:
|
||||||
return con
|
return con
|
||||||
|
|
||||||
def test_stmt_set_tbname_tag(self,conn):
|
def test_stmt_set_tbname_tag(self,conn):
|
||||||
dbname = "stmt_set_tbname_tag"
|
dbname = "stmt_tag"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
conn.execute("drop database if exists %s" % dbname)
|
conn.execute("drop database if exists %s" % dbname)
|
||||||
|
@ -196,31 +196,31 @@ class TDTestCase:
|
||||||
assert rows9[0][0] == 12, 'fourth case is failed'
|
assert rows9[0][0] == 12, 'fourth case is failed'
|
||||||
assert rows9[1][0] == 12, 'fourth case is failed'
|
assert rows9[1][0] == 12, 'fourth case is failed'
|
||||||
|
|
||||||
# #query: conversion Functions
|
#query: conversion Functions
|
||||||
|
|
||||||
# querystmt4=conn.statement("select cast( ? as bigint) from log ")
|
querystmt4=conn.statement("select cast( ? as bigint) from log ")
|
||||||
# queryparam4=new_bind_params(1)
|
queryparam4=new_bind_params(1)
|
||||||
# print(type(queryparam4))
|
print(type(queryparam4))
|
||||||
# queryparam4[0].binary('1232a')
|
queryparam4[0].binary('1232a')
|
||||||
# querystmt4.bind_param(queryparam4)
|
querystmt4.bind_param(queryparam4)
|
||||||
# querystmt4.execute()
|
querystmt4.execute()
|
||||||
# result4=querystmt4.use_result()
|
result4=querystmt4.use_result()
|
||||||
# rows4=result4.fetch_all()
|
rows4=result4.fetch_all()
|
||||||
# print("5",rows4)
|
print("5",rows4)
|
||||||
# assert rows4[0][0] == 1232
|
assert rows4[0][0] == 1232
|
||||||
# assert rows4[1][0] == 1232
|
assert rows4[1][0] == 1232
|
||||||
|
|
||||||
# querystmt4=conn.statement("select cast( ? as binary(10)) from log ")
|
querystmt4=conn.statement("select cast( ? as binary(10)) from log ")
|
||||||
# queryparam4=new_bind_params(1)
|
queryparam4=new_bind_params(1)
|
||||||
# print(type(queryparam4))
|
print(type(queryparam4))
|
||||||
# queryparam4[0].int(123)
|
queryparam4[0].int(123)
|
||||||
# querystmt4.bind_param(queryparam4)
|
querystmt4.bind_param(queryparam4)
|
||||||
# querystmt4.execute()
|
querystmt4.execute()
|
||||||
# result4=querystmt4.use_result()
|
result4=querystmt4.use_result()
|
||||||
# rows4=result4.fetch_all()
|
rows4=result4.fetch_all()
|
||||||
# print("6",rows4)
|
print("6",rows4)
|
||||||
# assert rows4[0][0] == '123'
|
assert rows4[0][0] == '123'
|
||||||
# assert rows4[1][0] == '123'
|
assert rows4[1][0] == '123'
|
||||||
|
|
||||||
# #query: datatime Functions
|
# #query: datatime Functions
|
||||||
|
|
||||||
|
|
|
@ -84,21 +84,21 @@ class TDTestCase:
|
||||||
def test_stmt_insert_multi(self,conn):
|
def test_stmt_insert_multi(self,conn):
|
||||||
# type: (TaosConnection) -> None
|
# type: (TaosConnection) -> None
|
||||||
|
|
||||||
dbname = "pytest_taos_stmt_multi"
|
dbname = "db_stmt"
|
||||||
try:
|
try:
|
||||||
conn.execute("drop database if exists %s" % dbname)
|
conn.execute("drop database if exists %s" % dbname)
|
||||||
conn.execute("create database if not exists %s" % dbname)
|
conn.execute("create database if not exists %s" % dbname)
|
||||||
conn.select_db(dbname)
|
conn.select_db(dbname)
|
||||||
|
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"create table if not exists log(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\
|
"create table if not exists stb1(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\
|
||||||
bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \
|
bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \
|
||||||
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
|
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
|
||||||
)
|
)
|
||||||
# conn.load_table_info("log")
|
# conn.load_table_info("log")
|
||||||
|
|
||||||
start = datetime.now()
|
start = datetime.now()
|
||||||
stmt = conn.statement("insert into log values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
|
stmt = conn.statement("insert into stb1 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
|
||||||
|
|
||||||
params = new_multi_binds(16)
|
params = new_multi_binds(16)
|
||||||
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
|
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
|
||||||
|
@ -125,7 +125,7 @@ class TDTestCase:
|
||||||
assert stmt.affected_rows == 3
|
assert stmt.affected_rows == 3
|
||||||
|
|
||||||
#query 1
|
#query 1
|
||||||
querystmt=conn.statement("select ?,bu from log")
|
querystmt=conn.statement("select ?,bu from stb1")
|
||||||
queryparam=new_bind_params(1)
|
queryparam=new_bind_params(1)
|
||||||
print(type(queryparam))
|
print(type(queryparam))
|
||||||
queryparam[0].binary("ts")
|
queryparam[0].binary("ts")
|
||||||
|
@ -135,7 +135,7 @@ class TDTestCase:
|
||||||
# rows=result.fetch_all()
|
# rows=result.fetch_all()
|
||||||
# print( querystmt.use_result())
|
# print( querystmt.use_result())
|
||||||
|
|
||||||
# result = conn.query("select * from log")
|
# result = conn.query("select * from stb1")
|
||||||
rows=result.fetch_all()
|
rows=result.fetch_all()
|
||||||
# rows=result.fetch_all()
|
# rows=result.fetch_all()
|
||||||
print(rows)
|
print(rows)
|
||||||
|
@ -144,7 +144,7 @@ class TDTestCase:
|
||||||
assert rows[2][1] == None
|
assert rows[2][1] == None
|
||||||
|
|
||||||
#query 2
|
#query 2
|
||||||
querystmt1=conn.statement("select * from log where bu < ?")
|
querystmt1=conn.statement("select * from stb1 where bu < ?")
|
||||||
queryparam1=new_bind_params(1)
|
queryparam1=new_bind_params(1)
|
||||||
print(type(queryparam1))
|
print(type(queryparam1))
|
||||||
queryparam1[0].int(4)
|
queryparam1[0].int(4)
|
||||||
|
|
|
@ -17,6 +17,8 @@ import threading as thd
|
||||||
import multiprocessing as mp
|
import multiprocessing as mp
|
||||||
from numpy.lib.function_base import insert
|
from numpy.lib.function_base import insert
|
||||||
import taos
|
import taos
|
||||||
|
from util.dnodes import TDDnode
|
||||||
|
from util.dnodes import *
|
||||||
from util.log import *
|
from util.log import *
|
||||||
from util.cases import *
|
from util.cases import *
|
||||||
from util.sql import *
|
from util.sql import *
|
||||||
|
@ -30,9 +32,9 @@ class TDTestCase:
|
||||||
#
|
#
|
||||||
# --------------- main frame -------------------
|
# --------------- main frame -------------------
|
||||||
#
|
#
|
||||||
clientCfgDict = {'queryproxy': '1','debugFlag': 135}
|
clientCfgDict = {'queryPolicy': '1','debugFlag': 135}
|
||||||
clientCfgDict["queryproxy"] = '2'
|
clientCfgDict["queryPolicy"] = '1'
|
||||||
clientCfgDict["debugFlag"] = 143
|
clientCfgDict["debugFlag"] = 131
|
||||||
|
|
||||||
updatecfgDict = {'clientCfg': {}}
|
updatecfgDict = {'clientCfg': {}}
|
||||||
updatecfgDict = {'debugFlag': 143}
|
updatecfgDict = {'debugFlag': 143}
|
||||||
|
@ -62,7 +64,7 @@ class TDTestCase:
|
||||||
return buildPath
|
return buildPath
|
||||||
|
|
||||||
# init
|
# init
|
||||||
def init(self, conn, logSql):
|
def init(self, conn, logSql=True):
|
||||||
tdLog.debug("start to execute %s" % __file__)
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
tdSql.init(conn.cursor())
|
tdSql.init(conn.cursor())
|
||||||
# tdSql.prepare()
|
# tdSql.prepare()
|
||||||
|
@ -292,12 +294,13 @@ class TDTestCase:
|
||||||
|
|
||||||
tdLog.debug("-----create database and muti-thread create tables test------- ")
|
tdLog.debug("-----create database and muti-thread create tables test------- ")
|
||||||
|
|
||||||
def test_case4(self):
|
def test_case1(self):
|
||||||
self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 2, 1*10)
|
self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 2, 1*10)
|
||||||
tdSql.execute("use db1;")
|
tdSql.execute("use db1;")
|
||||||
tdSql.query("show dnodes;")
|
tdSql.query("show dnodes;")
|
||||||
dnodeId=tdSql.getData(0,0)
|
dnodeId=tdSql.getData(0,0)
|
||||||
print(dnodeId)
|
print(dnodeId)
|
||||||
|
tdLog.debug("create qnode on dnode %s"%dnodeId)
|
||||||
tdSql.execute("create qnode on dnode %s"%dnodeId)
|
tdSql.execute("create qnode on dnode %s"%dnodeId)
|
||||||
tdSql.query("select max(c1) from stb10;")
|
tdSql.query("select max(c1) from stb10;")
|
||||||
maxQnode=tdSql.getData(0,0)
|
maxQnode=tdSql.getData(0,0)
|
||||||
|
@ -310,6 +313,7 @@ class TDTestCase:
|
||||||
|
|
||||||
# tdSql.query("show qnodes;")
|
# tdSql.query("show qnodes;")
|
||||||
# qnodeId=tdSql.getData(0,0)
|
# qnodeId=tdSql.getData(0,0)
|
||||||
|
tdLog.debug("drop qnode on dnode %s"%dnodeId)
|
||||||
tdSql.execute("drop qnode on dnode %s"%dnodeId)
|
tdSql.execute("drop qnode on dnode %s"%dnodeId)
|
||||||
tdSql.execute("reset query cache")
|
tdSql.execute("reset query cache")
|
||||||
tdSql.query("select max(c1) from stb10;")
|
tdSql.query("select max(c1) from stb10;")
|
||||||
|
@ -323,15 +327,156 @@ class TDTestCase:
|
||||||
unionallVnode=tdSql.queryResult
|
unionallVnode=tdSql.queryResult
|
||||||
assert unionallQnode == unionallVnode
|
assert unionallQnode == unionallVnode
|
||||||
|
|
||||||
|
queryPolicy=2
|
||||||
|
simClientCfg="%s/taos.cfg"%tdDnodes.getSimCfgPath()
|
||||||
|
cmd='sed -i "s/^queryPolicy.*/queryPolicy 2/g" %s'%simClientCfg
|
||||||
|
os.system(cmd)
|
||||||
|
# tdDnodes.stop(1)
|
||||||
|
# tdDnodes.start(1)
|
||||||
|
tdSql.execute("reset query cache")
|
||||||
|
tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy)
|
||||||
|
tdSql.query("show local variables;")
|
||||||
|
for i in range(tdSql.queryRows):
|
||||||
|
if tdSql.queryResult[i][0] == "queryPolicy" :
|
||||||
|
if int(tdSql.queryResult[i][1]) == int(queryPolicy):
|
||||||
|
tdLog.success('alter queryPolicy to %d successfully'%queryPolicy)
|
||||||
|
else :
|
||||||
|
tdLog.debug(tdSql.queryResult)
|
||||||
|
tdLog.exit("alter queryPolicy to %d failed"%queryPolicy)
|
||||||
|
tdSql.execute("reset query cache")
|
||||||
|
|
||||||
|
tdSql.execute("use db1;")
|
||||||
|
tdSql.query("show dnodes;")
|
||||||
|
dnodeId=tdSql.getData(0,0)
|
||||||
|
tdLog.debug("create qnode on dnode %s"%dnodeId)
|
||||||
|
|
||||||
|
tdSql.execute("create qnode on dnode %s"%dnodeId)
|
||||||
|
tdSql.query("select max(c1) from stb10;")
|
||||||
|
assert maxQnode==tdSql.getData(0,0)
|
||||||
|
tdSql.query("select min(c1) from stb11;")
|
||||||
|
assert minQnode==tdSql.getData(0,0)
|
||||||
|
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;")
|
||||||
|
assert unionQnode==tdSql.queryResult
|
||||||
|
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;")
|
||||||
|
assert unionallQnode==tdSql.queryResult
|
||||||
|
|
||||||
|
# tdSql.query("show qnodes;")
|
||||||
|
# qnodeId=tdSql.getData(0,0)
|
||||||
|
tdLog.debug("drop qnode on dnode %s"%dnodeId)
|
||||||
|
tdSql.execute("drop qnode on dnode %s"%dnodeId)
|
||||||
|
tdSql.execute("reset query cache")
|
||||||
|
tdSql.query("select max(c1) from stb10;")
|
||||||
|
assert maxQnode==tdSql.getData(0,0)
|
||||||
|
tdSql.query("select min(c1) from stb11;")
|
||||||
|
assert minQnode==tdSql.getData(0,0)
|
||||||
|
tdSql.error("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;")
|
||||||
|
tdSql.error("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;")
|
||||||
|
|
||||||
# tdSql.execute("create qnode on dnode %s"%dnodeId)
|
# tdSql.execute("create qnode on dnode %s"%dnodeId)
|
||||||
|
|
||||||
|
queryPolicy=3
|
||||||
|
simClientCfg="%s/taos.cfg"%tdDnodes.getSimCfgPath()
|
||||||
|
cmd='sed -i "s/^queryPolicy.*/queryPolicy 2/g" %s'%simClientCfg
|
||||||
|
os.system(cmd)
|
||||||
|
# tdDnodes.stop(1)
|
||||||
|
# tdDnodes.start(1)
|
||||||
|
tdSql.execute("reset query cache")
|
||||||
|
tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy)
|
||||||
|
tdSql.query("show local variables;")
|
||||||
|
for i in range(tdSql.queryRows):
|
||||||
|
if tdSql.queryResult[i][0] == "queryPolicy" :
|
||||||
|
if int(tdSql.queryResult[i][1]) == int(queryPolicy):
|
||||||
|
tdLog.success('alter queryPolicy to %d successfully'%queryPolicy)
|
||||||
|
else :
|
||||||
|
tdLog.debug(tdSql.queryResult)
|
||||||
|
tdLog.exit("alter queryPolicy to %d failed"%queryPolicy)
|
||||||
|
tdSql.execute("reset query cache")
|
||||||
|
|
||||||
|
tdSql.execute("use db1;")
|
||||||
|
tdSql.query("show dnodes;")
|
||||||
|
dnodeId=tdSql.getData(0,0)
|
||||||
|
tdLog.debug("create qnode on dnode %s"%dnodeId)
|
||||||
|
|
||||||
|
tdSql.execute("create qnode on dnode %s"%dnodeId)
|
||||||
|
tdSql.query("select max(c1) from stb10;")
|
||||||
|
assert maxQnode==tdSql.getData(0,0)
|
||||||
|
tdSql.query("select min(c1) from stb11;")
|
||||||
|
assert minQnode==tdSql.getData(0,0)
|
||||||
|
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;")
|
||||||
|
assert unionQnode==tdSql.queryResult
|
||||||
|
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;")
|
||||||
|
assert unionallQnode==tdSql.queryResult
|
||||||
|
|
||||||
|
def test_case2(self):
|
||||||
|
self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 10, 2, 1*10)
|
||||||
|
tdSql.query("show qnodes")
|
||||||
|
if tdSql.queryRows == 1 :
|
||||||
|
tdLog.debug("drop qnode on dnode 1")
|
||||||
|
tdSql.execute("drop qnode on dnode 1")
|
||||||
|
queryPolicy=2
|
||||||
|
simClientCfg="%s/taos.cfg"%tdDnodes.getSimCfgPath()
|
||||||
|
cmd='sed -i "s/^queryPolicy.*/queryPolicy 2/g" %s'%simClientCfg
|
||||||
|
os.system(cmd)
|
||||||
|
# tdDnodes.stop(1)
|
||||||
|
# tdDnodes.start(1)
|
||||||
|
tdSql.execute("reset query cache")
|
||||||
|
tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy)
|
||||||
|
tdSql.query("show local variables;")
|
||||||
|
for i in range(tdSql.queryRows):
|
||||||
|
if tdSql.queryResult[i][0] == "queryPolicy" :
|
||||||
|
if int(tdSql.queryResult[i][1]) == int(queryPolicy):
|
||||||
|
tdLog.success('alter queryPolicy to %d successfully'%queryPolicy)
|
||||||
|
else :
|
||||||
|
tdLog.debug(tdSql.queryResult)
|
||||||
|
tdLog.exit("alter queryPolicy to %d failed"%queryPolicy)
|
||||||
|
tdSql.execute("use db1;")
|
||||||
|
tdSql.error("select max(c1) from stb10;")
|
||||||
|
tdSql.error("select min(c1) from stb11;")
|
||||||
|
tdSql.error("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;")
|
||||||
|
tdSql.error("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;")
|
||||||
|
|
||||||
|
tdSql.query("select max(c1) from stb10_0;")
|
||||||
|
tdSql.query("select min(c1) from stb11_0;")
|
||||||
|
|
||||||
|
def test_case3(self):
|
||||||
|
|
||||||
|
tdSql.execute('alter local "queryPolicy" "3"')
|
||||||
|
tdLog.debug("create qnode on dnode 1")
|
||||||
|
tdSql.execute("create qnode on dnode 1")
|
||||||
|
tdSql.execute("use db1;")
|
||||||
|
tdSql.query("show dnodes;")
|
||||||
|
dnodeId=tdSql.getData(0,0)
|
||||||
|
print(dnodeId)
|
||||||
|
|
||||||
|
tdSql.query("select max(c1) from stb10;")
|
||||||
|
maxQnode=tdSql.getData(0,0)
|
||||||
|
tdSql.query("select min(c1) from stb11;")
|
||||||
|
minQnode=tdSql.getData(0,0)
|
||||||
|
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;")
|
||||||
|
unionQnode=tdSql.queryResult
|
||||||
|
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;")
|
||||||
|
unionallQnode=tdSql.queryResult
|
||||||
|
|
||||||
|
# tdSql.query("show qnodes;")
|
||||||
|
# qnodeId=tdSql.getData(0,0)
|
||||||
|
tdLog.debug("drop qnode on dnode %s"%dnodeId)
|
||||||
|
|
||||||
|
tdSql.execute("drop qnode on dnode %s"%dnodeId)
|
||||||
|
tdSql.execute("reset query cache")
|
||||||
|
|
||||||
|
tdSql.error("select max(c1) from stb10;")
|
||||||
|
tdSql.error("select min(c1) from stb11;")
|
||||||
|
tdSql.error("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;")
|
||||||
|
tdSql.error("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;")
|
||||||
|
|
||||||
# run case
|
# run case
|
||||||
def run(self):
|
def run(self):
|
||||||
|
|
||||||
# test qnode
|
# test qnode
|
||||||
self.test_case4()
|
self.test_case1()
|
||||||
tdLog.debug(" LIMIT test_case3 ............ [OK]")
|
self.test_case2()
|
||||||
|
|
||||||
|
self.test_case3()
|
||||||
|
# tdLog.debug(" LIMIT test_case3 ............ [OK]")
|
||||||
|
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
|
@ -65,31 +65,6 @@ class TDTestCase:
|
||||||
self._async_raise(thread.ident, SystemExit)
|
self._async_raise(thread.ident, SystemExit)
|
||||||
|
|
||||||
|
|
||||||
def insertData(self,countstart,countstop):
|
|
||||||
# fisrt add data : db\stable\childtable\general table
|
|
||||||
|
|
||||||
for couti in range(countstart,countstop):
|
|
||||||
tdLog.debug("drop database if exists db%d" %couti)
|
|
||||||
tdSql.execute("drop database if exists db%d" %couti)
|
|
||||||
print("create database if not exists db%d replica 1 duration 300" %couti)
|
|
||||||
tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti)
|
|
||||||
tdSql.execute("use db%d" %couti)
|
|
||||||
tdSql.execute(
|
|
||||||
'''create table stb1
|
|
||||||
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
|
|
||||||
tags (t1 int)
|
|
||||||
'''
|
|
||||||
)
|
|
||||||
tdSql.execute(
|
|
||||||
'''
|
|
||||||
create table t1
|
|
||||||
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
|
|
||||||
'''
|
|
||||||
)
|
|
||||||
for i in range(4):
|
|
||||||
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
|
|
||||||
|
|
||||||
|
|
||||||
def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
|
def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
|
||||||
tdLog.printNoPrefix("======== test case 1: ")
|
tdLog.printNoPrefix("======== test case 1: ")
|
||||||
paraDict = {'dbName': 'db',
|
paraDict = {'dbName': 'db',
|
||||||
|
@ -143,7 +118,8 @@ class TDTestCase:
|
||||||
threads=[]
|
threads=[]
|
||||||
for i in range(restartNumbers):
|
for i in range(restartNumbers):
|
||||||
dbNameIndex = '%s%d'%(paraDict["dbName"],i)
|
dbNameIndex = '%s%d'%(paraDict["dbName"],i)
|
||||||
threads.append(threading.Thread(target=clusterComCreate.create_databases, args=(tdSql, dbNameIndex,paraDict["dbNumbers"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica'])))
|
newTdSql=tdCom.newTdSql()
|
||||||
|
threads.append(threading.Thread(target=clusterComCreate.create_databases, args=(newTdSql, dbNameIndex,paraDict["dbNumbers"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica'])))
|
||||||
|
|
||||||
for tr in threads:
|
for tr in threads:
|
||||||
tr.start()
|
tr.start()
|
||||||
|
|
|
@ -39,6 +39,7 @@ class ClusterComCheck:
|
||||||
|
|
||||||
def checkDnodes(self,dnodeNumbers):
|
def checkDnodes(self,dnodeNumbers):
|
||||||
count=0
|
count=0
|
||||||
|
# print(tdSql)
|
||||||
while count < 5:
|
while count < 5:
|
||||||
tdSql.query("show dnodes")
|
tdSql.query("show dnodes")
|
||||||
# tdLog.debug(tdSql.queryResult)
|
# tdLog.debug(tdSql.queryResult)
|
||||||
|
@ -85,7 +86,7 @@ class ClusterComCheck:
|
||||||
tdLog.debug("check %s_%d that status is ready "%(dbNameIndex,j))
|
tdLog.debug("check %s_%d that status is ready "%(dbNameIndex,j))
|
||||||
else:
|
else:
|
||||||
continue
|
continue
|
||||||
print(query_status)
|
# print(query_status)
|
||||||
count+=1
|
count+=1
|
||||||
if query_status == dbNumbers:
|
if query_status == dbNumbers:
|
||||||
tdLog.success("we find cluster with %d dnode and check all databases are ready within 5s! " %dbNumbers)
|
tdLog.success("we find cluster with %d dnode and check all databases are ready within 5s! " %dbNumbers)
|
||||||
|
|
|
@ -125,7 +125,6 @@ class ClusterComCreate:
|
||||||
|
|
||||||
def create_databases(self,tsql,dbNameIndex,dbNumbers,dropFlag=1,vgroups=4,replica=1):
|
def create_databases(self,tsql,dbNameIndex,dbNumbers,dropFlag=1,vgroups=4,replica=1):
|
||||||
for i in range(dbNumbers):
|
for i in range(dbNumbers):
|
||||||
print(dbNumbers)
|
|
||||||
if dropFlag == 1:
|
if dropFlag == 1:
|
||||||
tsql.execute("drop database if exists %s_%d"%(dbNameIndex,i))
|
tsql.execute("drop database if exists %s_%d"%(dbNameIndex,i))
|
||||||
tsql.execute("create database if not exists %s_%d vgroups %d replica %d"%(dbNameIndex,i, vgroups, replica))
|
tsql.execute("create database if not exists %s_%d vgroups %d replica %d"%(dbNameIndex,i, vgroups, replica))
|
||||||
|
|
|
@ -122,9 +122,9 @@ class TDTestCase:
|
||||||
tdLog.info("wait the consume result")
|
tdLog.info("wait the consume result")
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
# if expectRowsList[2] != resultList[0]:
|
if expectRowsList[2] != resultList[0]:
|
||||||
# tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0]))
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0]))
|
||||||
# tdLog.exit("2 tmq consume rows error!")
|
tdLog.exit("2 tmq consume rows error!")
|
||||||
|
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
for i in range(len(topicNameList)):
|
for i in range(len(topicNameList)):
|
||||||
|
|
|
@ -16,7 +16,7 @@ sys.path.append("./7-tmq")
|
||||||
from tmqCommon import *
|
from tmqCommon import *
|
||||||
|
|
||||||
class TDTestCase:
|
class TDTestCase:
|
||||||
def __int__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 10000
|
||||||
|
@ -216,7 +216,7 @@ class TDTestCase:
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 10000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': -1,
|
'pollDelay': 10,
|
||||||
'showMsg': 1,
|
'showMsg': 1,
|
||||||
'showRow': 1,
|
'showRow': 1,
|
||||||
'snapshot': 1}
|
'snapshot': 1}
|
||||||
|
@ -281,10 +281,107 @@ class TDTestCase:
|
||||||
|
|
||||||
tdLog.printNoPrefix("======== test case 3 end ...... ")
|
tdLog.printNoPrefix("======== test case 3 end ...... ")
|
||||||
|
|
||||||
|
def tmqCase4(self):
|
||||||
|
tdLog.printNoPrefix("======== test case 4: ")
|
||||||
|
paraDict = {'dbName': 'dbt',
|
||||||
|
'dropFlag': 1,
|
||||||
|
'event': '',
|
||||||
|
'vgroups': 1,
|
||||||
|
'stbName': 'stb',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||||
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbNum': 1,
|
||||||
|
'rowsPerTbl': 10000,
|
||||||
|
'batchNum': 10,
|
||||||
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
|
'pollDelay': 10,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1,
|
||||||
|
'snapshot': 1}
|
||||||
|
|
||||||
|
paraDict['vgroups'] = self.vgroups
|
||||||
|
paraDict['ctbNum'] = self.ctbNum
|
||||||
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
|
topicNameList = ['topic1']
|
||||||
|
expectRowsList = []
|
||||||
|
tmqCom.initConsumerTable()
|
||||||
|
# tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||||
|
# tdLog.info("create stb")
|
||||||
|
# tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
|
||||||
|
# tdLog.info("create ctb")
|
||||||
|
# tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
|
||||||
|
# tdLog.info("insert data")
|
||||||
|
# tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
|
||||||
|
|
||||||
|
# tdDnodes.stop(1)
|
||||||
|
# tdDnodes.start(1)
|
||||||
|
|
||||||
|
tdLog.info("create topics from stb with filter")
|
||||||
|
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||||
|
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||||
|
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||||
|
tdLog.info("create topic sql: %s"%sqlString)
|
||||||
|
tdSql.execute(sqlString)
|
||||||
|
tdSql.query(queryString)
|
||||||
|
expectRowsList.append(tdSql.getRows())
|
||||||
|
totalRowsInserted = expectRowsList[0]
|
||||||
|
|
||||||
|
# init consume info, and start tmq_sim, then check consume result
|
||||||
|
tdLog.info("insert consume info to consume processor")
|
||||||
|
consumerId = 5
|
||||||
|
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
|
||||||
|
topicList = topicNameList[0]
|
||||||
|
ifcheckdata = 1
|
||||||
|
ifManualCommit = 1
|
||||||
|
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
|
||||||
|
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
tdLog.info("start consume processor 0")
|
||||||
|
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||||
|
|
||||||
|
tdLog.info("wait commit notify")
|
||||||
|
tmqCom.getStartCommitNotifyFromTmqsim()
|
||||||
|
|
||||||
|
tdLog.info("pkill consume processor")
|
||||||
|
tdCom.killProcessor("tmq_sim")
|
||||||
|
|
||||||
|
# time.sleep(10)
|
||||||
|
|
||||||
|
# reinit consume info, and start tmq_sim, then check consume result
|
||||||
|
tmqCom.initConsumerTable()
|
||||||
|
consumerId = 6
|
||||||
|
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
tdLog.info("start consume processor 1")
|
||||||
|
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||||
|
tdLog.info("wait the consume result")
|
||||||
|
|
||||||
|
expectRows = 1
|
||||||
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
|
actConsumeTotalRows = resultList[0]
|
||||||
|
|
||||||
|
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
|
||||||
|
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
||||||
|
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
||||||
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
time.sleep(10)
|
||||||
|
for i in range(len(topicNameList)):
|
||||||
|
tdSql.query("drop topic %s"%topicNameList[i])
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("======== test case 4 end ...... ")
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
tdSql.prepare()
|
tdSql.prepare()
|
||||||
self.tmqCase1()
|
self.tmqCase1()
|
||||||
self.tmqCase2()
|
self.tmqCase2()
|
||||||
|
self.tmqCase3()
|
||||||
|
self.tmqCase4()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
|
|
|
@ -121,7 +121,7 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
|
||||||
python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3
|
python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3
|
||||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
|
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
|
||||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
|
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
|
||||||
# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3
|
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3
|
||||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3
|
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3
|
||||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3
|
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3
|
||||||
# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3
|
# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3
|
||||||
|
|
|
@ -36,6 +36,7 @@
|
||||||
#define MAX_CONSUMER_THREAD_CNT (16)
|
#define MAX_CONSUMER_THREAD_CNT (16)
|
||||||
#define MAX_VGROUP_CNT (32)
|
#define MAX_VGROUP_CNT (32)
|
||||||
|
|
||||||
|
int64_t now;
|
||||||
typedef enum {
|
typedef enum {
|
||||||
NOTIFY_CMD_START_CONSUM,
|
NOTIFY_CMD_START_CONSUM,
|
||||||
NOTIFY_CMD_START_COMMIT,
|
NOTIFY_CMD_START_COMMIT,
|
||||||
|
@ -525,15 +526,15 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
|
||||||
|
|
||||||
static int32_t g_once_commit_flag = 0;
|
static int32_t g_once_commit_flag = 0;
|
||||||
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
||||||
pError("tmq_commit_cb_print() commit %d\n", code);
|
pError("tmq_commit_cb_print() commit %d\n", code);
|
||||||
|
|
||||||
if (0 == g_once_commit_flag) {
|
if (0 == g_once_commit_flag) {
|
||||||
g_once_commit_flag = 1;
|
g_once_commit_flag = 1;
|
||||||
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
|
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
|
||||||
}
|
}
|
||||||
|
|
||||||
char tmpString[128];
|
char tmpString[128];
|
||||||
taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
|
taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
|
||||||
}
|
}
|
||||||
|
|
||||||
void build_consumer(SThreadInfo* pInfo) {
|
void build_consumer(SThreadInfo* pInfo) {
|
||||||
|
@ -588,12 +589,10 @@ void build_topic_list(SThreadInfo* pInfo) {
|
||||||
|
|
||||||
int32_t saveConsumeResult(SThreadInfo* pInfo) {
|
int32_t saveConsumeResult(SThreadInfo* pInfo) {
|
||||||
char sqlStr[1024] = {0};
|
char sqlStr[1024] = {0};
|
||||||
|
|
||||||
int64_t now = taosGetTimestampMs();
|
|
||||||
|
|
||||||
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
||||||
sprintf(sqlStr, "insert into %s.consumeresult values (%" PRId64 ", %d, %" PRId64 ", %" PRId64 ", %d)",
|
sprintf(sqlStr, "insert into %s.consumeresult values (%" PRId64 ", %d, %" PRId64 ", %" PRId64 ", %d)",
|
||||||
g_stConfInfo.cdbName, now, pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
|
g_stConfInfo.cdbName, atomic_fetch_add_64(&now, 1), pInfo->consumerId, pInfo->consumeMsgCnt,
|
||||||
|
pInfo->consumeRowCnt, pInfo->checkresult);
|
||||||
|
|
||||||
char tmpString[128];
|
char tmpString[128];
|
||||||
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr);
|
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr);
|
||||||
|
@ -637,9 +636,9 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t lastTotalMsgs = 0;
|
int64_t lastTotalMsgs = 0;
|
||||||
uint64_t lastPrintTime = taosGetTimestampMs();
|
uint64_t lastPrintTime = taosGetTimestampMs();
|
||||||
uint64_t startTs = taosGetTimestampMs();
|
uint64_t startTs = taosGetTimestampMs();
|
||||||
|
|
||||||
int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
|
int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
|
||||||
while (running) {
|
while (running) {
|
||||||
|
@ -652,16 +651,16 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
taos_free_result(tmqMsg);
|
taos_free_result(tmqMsg);
|
||||||
|
|
||||||
totalMsgs++;
|
totalMsgs++;
|
||||||
|
|
||||||
int64_t currentPrintTime = taosGetTimestampMs();
|
int64_t currentPrintTime = taosGetTimestampMs();
|
||||||
if (currentPrintTime - lastPrintTime > 10 * 1000) {
|
if (currentPrintTime - lastPrintTime > 10 * 1000) {
|
||||||
taosFprintfFile(g_fp,
|
taosFprintfFile(
|
||||||
"consumer id %d has currently poll total msgs: %" PRId64 ", period rate: %.3f msgs/second\n",
|
g_fp, "consumer id %d has currently poll total msgs: %" PRId64 ", period rate: %.3f msgs/second\n",
|
||||||
pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0/(currentPrintTime - lastPrintTime));
|
pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / (currentPrintTime - lastPrintTime));
|
||||||
lastPrintTime = currentPrintTime;
|
lastPrintTime = currentPrintTime;
|
||||||
lastTotalMsgs = totalMsgs;
|
lastTotalMsgs = totalMsgs;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (0 == once_flag) {
|
if (0 == once_flag) {
|
||||||
once_flag = 1;
|
once_flag = 1;
|
||||||
notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
|
notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
|
||||||
|
@ -678,7 +677,7 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (0 == running) {
|
if (0 == running) {
|
||||||
taosFprintfFile(g_fp, "receive stop signal and not continue consume\n");
|
taosFprintfFile(g_fp, "receive stop signal and not continue consume\n");
|
||||||
}
|
}
|
||||||
|
@ -696,6 +695,7 @@ void* consumeThreadFunc(void* param) {
|
||||||
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||||
if (pInfo->taos == NULL) {
|
if (pInfo->taos == NULL) {
|
||||||
taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
|
taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
|
||||||
|
ASSERT(0);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -855,6 +855,8 @@ int32_t getConsumeInfo() {
|
||||||
}
|
}
|
||||||
|
|
||||||
int main(int32_t argc, char* argv[]) {
|
int main(int32_t argc, char* argv[]) {
|
||||||
|
now = taosGetTimestampMs();
|
||||||
|
|
||||||
parseArgument(argc, argv);
|
parseArgument(argc, argv);
|
||||||
getConsumeInfo();
|
getConsumeInfo();
|
||||||
saveConfigToLogFile();
|
saveConfigToLogFile();
|
||||||
|
@ -888,11 +890,11 @@ int main(int32_t argc, char* argv[]) {
|
||||||
|
|
||||||
int64_t t = end - start;
|
int64_t t = end - start;
|
||||||
if (0 == t) t = 1;
|
if (0 == t) t = 1;
|
||||||
|
|
||||||
double tInMs = (double)t / 1000000.0;
|
double tInMs = (double)t / 1000000.0;
|
||||||
taosFprintfFile(g_fp,
|
taosFprintfFile(g_fp,
|
||||||
"Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/second\n\n",
|
"Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/second\n\n",
|
||||||
tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs));
|
tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs));
|
||||||
|
|
||||||
taosFprintfFile(g_fp, "==== close tmqlog ====\n");
|
taosFprintfFile(g_fp, "==== close tmqlog ====\n");
|
||||||
taosCloseFile(&g_fp);
|
taosCloseFile(&g_fp);
|
||||||
|
|
Loading…
Reference in New Issue