Merge branch '3.0' into 3.0test/jcy
This commit is contained in:
commit
3f9f85927c
|
@ -125,6 +125,7 @@ typedef enum EFunctionType {
|
||||||
FUNCTION_TYPE_BLOCK_DIST_INFO, // block distribution pseudo column function
|
FUNCTION_TYPE_BLOCK_DIST_INFO, // block distribution pseudo column function
|
||||||
FUNCTION_TYPE_TO_COLUMN,
|
FUNCTION_TYPE_TO_COLUMN,
|
||||||
FUNCTION_TYPE_GROUP_KEY,
|
FUNCTION_TYPE_GROUP_KEY,
|
||||||
|
FUNCTION_TYPE_CACHE_LAST_ROW,
|
||||||
|
|
||||||
// distributed splitting functions
|
// distributed splitting functions
|
||||||
FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000,
|
FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000,
|
||||||
|
|
|
@ -91,6 +91,7 @@ typedef struct SAggLogicNode {
|
||||||
SLogicNode node;
|
SLogicNode node;
|
||||||
SNodeList* pGroupKeys;
|
SNodeList* pGroupKeys;
|
||||||
SNodeList* pAggFuncs;
|
SNodeList* pAggFuncs;
|
||||||
|
bool hasLastRow;
|
||||||
} SAggLogicNode;
|
} SAggLogicNode;
|
||||||
|
|
||||||
typedef struct SProjectLogicNode {
|
typedef struct SProjectLogicNode {
|
||||||
|
|
|
@ -111,9 +111,9 @@ else
|
||||||
fi
|
fi
|
||||||
|
|
||||||
csudo=""
|
csudo=""
|
||||||
if command -v sudo > /dev/null; then
|
#if command -v sudo > /dev/null; then
|
||||||
csudo="sudo "
|
# csudo="sudo "
|
||||||
fi
|
#fi
|
||||||
|
|
||||||
function is_valid_version() {
|
function is_valid_version() {
|
||||||
[ -z $1 ] && return 1 || :
|
[ -z $1 ] && return 1 || :
|
||||||
|
@ -181,7 +181,9 @@ cd "${curr_dir}"
|
||||||
|
|
||||||
# 2. cmake executable file
|
# 2. cmake executable file
|
||||||
compile_dir="${top_dir}/debug"
|
compile_dir="${top_dir}/debug"
|
||||||
${csudo}rm -rf ${compile_dir}
|
if [ -d ${compile_dir} ]; then
|
||||||
|
rm -rf ${compile_dir}
|
||||||
|
fi
|
||||||
|
|
||||||
mkdir -p ${compile_dir}
|
mkdir -p ${compile_dir}
|
||||||
cd ${compile_dir}
|
cd ${compile_dir}
|
||||||
|
@ -258,9 +260,9 @@ if [ "$osType" != "Darwin" ]; then
|
||||||
if [[ "$pagMode" == "full" ]]; then
|
if [[ "$pagMode" == "full" ]]; then
|
||||||
if [ -d ${top_dir}/tools/taos-tools/packaging/deb ]; then
|
if [ -d ${top_dir}/tools/taos-tools/packaging/deb ]; then
|
||||||
cd ${top_dir}/tools/taos-tools/packaging/deb
|
cd ${top_dir}/tools/taos-tools/packaging/deb
|
||||||
|
taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}')
|
||||||
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
|
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
|
||||||
|
|
||||||
taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}')
|
|
||||||
${csudo}./make-taos-tools-deb.sh ${top_dir} \
|
${csudo}./make-taos-tools-deb.sh ${top_dir} \
|
||||||
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
|
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
|
||||||
fi
|
fi
|
||||||
|
@ -283,9 +285,9 @@ if [ "$osType" != "Darwin" ]; then
|
||||||
if [[ "$pagMode" == "full" ]]; then
|
if [[ "$pagMode" == "full" ]]; then
|
||||||
if [ -d ${top_dir}/tools/taos-tools/packaging/rpm ]; then
|
if [ -d ${top_dir}/tools/taos-tools/packaging/rpm ]; then
|
||||||
cd ${top_dir}/tools/taos-tools/packaging/rpm
|
cd ${top_dir}/tools/taos-tools/packaging/rpm
|
||||||
|
taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}' | sed -e 's/-/_/g')
|
||||||
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
|
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
|
||||||
|
|
||||||
taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}' | sed -e 's/-/_/g')
|
|
||||||
${csudo}./make-taos-tools-rpm.sh ${top_dir} \
|
${csudo}./make-taos-tools-rpm.sh ${top_dir} \
|
||||||
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
|
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
|
||||||
fi
|
fi
|
||||||
|
@ -300,7 +302,7 @@ if [ "$osType" != "Darwin" ]; then
|
||||||
|
|
||||||
${csudo}./makepkg.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${verNumberComp} ${dbName}
|
${csudo}./makepkg.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${verNumberComp} ${dbName}
|
||||||
${csudo}./makeclient.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${dbName}
|
${csudo}./makeclient.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${dbName}
|
||||||
# ${csudo}./makearbi.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode}
|
${csudo}./makearbi.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode}
|
||||||
|
|
||||||
else
|
else
|
||||||
# only make client for Darwin
|
# only make client for Darwin
|
||||||
|
|
|
@ -48,7 +48,6 @@ struct SSmaEnv {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t smaRef;
|
int32_t smaRef;
|
||||||
int32_t refId;
|
|
||||||
} SSmaMgmt;
|
} SSmaMgmt;
|
||||||
|
|
||||||
#define SMA_ENV_LOCK(env) ((env)->lock)
|
#define SMA_ENV_LOCK(env) ((env)->lock)
|
||||||
|
@ -63,12 +62,12 @@ struct STSmaStat {
|
||||||
|
|
||||||
struct SRSmaStat {
|
struct SRSmaStat {
|
||||||
SSma *pSma;
|
SSma *pSma;
|
||||||
int64_t refId;
|
int64_t refId; // shared by persistence/fetch tasks
|
||||||
void *tmrHandle;
|
void *tmrHandle; // for persistence task
|
||||||
tmr_h tmrId;
|
tmr_h tmrId; // for persistence task
|
||||||
int32_t tmrSeconds;
|
int32_t tmrSeconds; // for persistence task
|
||||||
int8_t triggerStat;
|
int8_t triggerStat; // for persistence task
|
||||||
int8_t runningStat;
|
int8_t runningStat; // for persistence task
|
||||||
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
|
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -135,7 +135,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
|
||||||
|
|
||||||
// init smaMgmt
|
// init smaMgmt
|
||||||
smaMgmt.smaRef = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat);
|
smaMgmt.smaRef = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat);
|
||||||
if (smaMgmt.refId < 0) {
|
if (smaMgmt.smaRef < 0) {
|
||||||
smaError("init smaRef failed, num:%d", SMA_MGMT_REF_NUM);
|
smaError("init smaRef failed, num:%d", SMA_MGMT_REF_NUM);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
|
|
@ -50,6 +50,7 @@ static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
|
||||||
|
|
||||||
struct SRSmaInfoItem {
|
struct SRSmaInfoItem {
|
||||||
SRSmaInfo *pRsmaInfo;
|
SRSmaInfo *pRsmaInfo;
|
||||||
|
int64_t refId;
|
||||||
void *taskInfo; // qTaskInfo_t
|
void *taskInfo; // qTaskInfo_t
|
||||||
tmr_h tmrId;
|
tmr_h tmrId;
|
||||||
int8_t level;
|
int8_t level;
|
||||||
|
@ -60,11 +61,14 @@ struct SRSmaInfoItem {
|
||||||
|
|
||||||
struct SRSmaInfo {
|
struct SRSmaInfo {
|
||||||
STSchema *pTSchema;
|
STSchema *pTSchema;
|
||||||
SSma *pSma;
|
SRSmaStat *pStat;
|
||||||
int64_t suid;
|
int64_t suid;
|
||||||
SRSmaInfoItem items[TSDB_RETENTION_L2];
|
SRSmaInfoItem items[TSDB_RETENTION_L2];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#define RSMA_INFO_SMA(r) ((r)->pStat->pSma)
|
||||||
|
#define RSMA_INFO_STAT(r) ((r)->pStat)
|
||||||
|
|
||||||
struct SRSmaQTaskInfoItem {
|
struct SRSmaQTaskInfoItem {
|
||||||
int32_t len;
|
int32_t len;
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
@ -107,22 +111,21 @@ static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle, int32_t vgId,
|
||||||
|
|
||||||
void *tdFreeRSmaInfo(SRSmaInfo *pInfo) {
|
void *tdFreeRSmaInfo(SRSmaInfo *pInfo) {
|
||||||
if (pInfo) {
|
if (pInfo) {
|
||||||
|
SSma *pSma = RSMA_INFO_SMA(pInfo);
|
||||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||||
SRSmaInfoItem *pItem = &pInfo->items[i];
|
SRSmaInfoItem *pItem = &pInfo->items[i];
|
||||||
if (pItem->taskInfo) {
|
if (pItem->taskInfo) {
|
||||||
smaDebug("vgId:%d, stb %" PRIi64 " stop fetch-timer %p level %d", SMA_VID(pInfo->pSma), pInfo->suid,
|
smaDebug("vgId:%d, stb %" PRIi64 " stop fetch-timer %p level %d", SMA_VID(pSma), pInfo->suid, pItem->tmrId,
|
||||||
pItem->tmrId, i + 1);
|
i + 1);
|
||||||
taosTmrStopA(&pItem->tmrId);
|
taosTmrStopA(&pItem->tmrId);
|
||||||
tdFreeTaskHandle(&pItem->taskInfo, SMA_VID(pInfo->pSma), i + 1);
|
tdFreeTaskHandle(&pItem->taskInfo, SMA_VID(pSma), i + 1);
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, stb %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo",
|
smaDebug("vgId:%d, stb %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma),
|
||||||
SMA_VID(pInfo->pSma), pInfo->suid, i + 1);
|
pInfo->suid, i + 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosMemoryFree(pInfo->pTSchema);
|
taosMemoryFree(pInfo->pTSchema);
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
} else {
|
|
||||||
smaDebug("vgId:%d, stb %" PRIi64 " no need to destroy rsma info since empty", SMA_VID(pInfo->pSma), pInfo->suid);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -255,6 +258,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo
|
||||||
|
|
||||||
if (param->qmsg[idx]) {
|
if (param->qmsg[idx]) {
|
||||||
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
|
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
|
||||||
|
pItem->refId = RSMA_REF_ID(pRSmaInfo->pStat);
|
||||||
pItem->pRsmaInfo = pRSmaInfo;
|
pItem->pRsmaInfo = pRSmaInfo;
|
||||||
pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], pReadHandle);
|
pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], pReadHandle);
|
||||||
if (!pItem->taskInfo) {
|
if (!pItem->taskInfo) {
|
||||||
|
@ -340,7 +344,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
pRSmaInfo->pTSchema = pTSchema;
|
pRSmaInfo->pTSchema = pTSchema;
|
||||||
pRSmaInfo->pSma = pSma;
|
pRSmaInfo->pStat = pStat;
|
||||||
pRSmaInfo->suid = suid;
|
pRSmaInfo->suid = suid;
|
||||||
|
|
||||||
if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 0) < 0) {
|
if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 0) < 0) {
|
||||||
|
@ -522,7 +526,7 @@ static void tdDestroySDataBlockArray(SArray *pArray) {
|
||||||
static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) {
|
static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) {
|
||||||
SArray *pResult = NULL;
|
SArray *pResult = NULL;
|
||||||
SRSmaInfo *pRSmaInfo = pItem->pRsmaInfo;
|
SRSmaInfo *pRSmaInfo = pItem->pRsmaInfo;
|
||||||
SSma *pSma = pRSmaInfo->pSma;
|
SSma *pSma = RSMA_INFO_SMA(pRSmaInfo);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock *output = NULL;
|
SSDataBlock *output = NULL;
|
||||||
|
@ -585,21 +589,29 @@ _err:
|
||||||
*/
|
*/
|
||||||
static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
SRSmaInfoItem *pItem = param;
|
SRSmaInfoItem *pItem = param;
|
||||||
SSma *pSma = pItem->pRsmaInfo->pSma;
|
SSma *pSma = NULL;
|
||||||
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)pSma->pRSmaEnv);
|
SRSmaStat *pStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, pItem->refId);
|
||||||
|
if (!pStat) {
|
||||||
|
smaDebug("rsma fetch task not start since already destroyed");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pSma = RSMA_INFO_SMA(pItem->pRsmaInfo);
|
||||||
|
|
||||||
|
// if rsma trigger stat in cancelled or finished, not start fetch task anymore
|
||||||
int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
|
int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
|
||||||
if (rsmaTriggerStat == TASK_TRIGGER_STAT_CANCELLED || rsmaTriggerStat == TASK_TRIGGER_STAT_FINISHED) {
|
if (rsmaTriggerStat == TASK_TRIGGER_STAT_CANCELLED || rsmaTriggerStat == TASK_TRIGGER_STAT_FINISHED) {
|
||||||
smaDebug("vgId:%d, level %" PRIi8 " not fetch since stat is cancelled for table suid:%" PRIi64, SMA_VID(pSma),
|
taosReleaseRef(smaMgmt.smaRef, pItem->refId);
|
||||||
pItem->level, pItem->pRsmaInfo->suid);
|
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is cancelled",
|
||||||
|
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int8_t fetchTriggerStat =
|
int8_t fetchTriggerStat =
|
||||||
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
|
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
|
||||||
if (fetchTriggerStat == TASK_TRIGGER_STAT_ACTIVE) {
|
if (fetchTriggerStat == TASK_TRIGGER_STAT_ACTIVE) {
|
||||||
smaDebug("vgId:%d, level %" PRIi8 " stat is active for table suid:%" PRIi64, SMA_VID(pSma), pItem->level,
|
smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma),
|
||||||
pItem->pRsmaInfo->suid);
|
pItem->level, pItem->pRsmaInfo->suid);
|
||||||
|
|
||||||
tdRefSmaStat(pSma, (SSmaStat *)pStat);
|
tdRefSmaStat(pSma, (SSmaStat *)pStat);
|
||||||
|
|
||||||
|
@ -610,9 +622,11 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
tdUnRefSmaStat(pSma, (SSmaStat *)pStat);
|
tdUnRefSmaStat(pSma, (SSmaStat *)pStat);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, level %" PRIi8 " stat is inactive for table suid:%" PRIi64, SMA_VID(pSma), pItem->level,
|
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive",
|
||||||
pItem->pRsmaInfo->suid);
|
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
|
||||||
}
|
}
|
||||||
|
_end:
|
||||||
|
taosReleaseRef(smaMgmt.smaRef, pItem->refId);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, tb_uid_t suid,
|
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, tb_uid_t suid,
|
||||||
|
@ -632,7 +646,6 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
|
||||||
|
|
||||||
tdFetchAndSubmitRSmaResult(pItem, STREAM_INPUT__DATA_SUBMIT);
|
tdFetchAndSubmitRSmaResult(pItem, STREAM_INPUT__DATA_SUBMIT);
|
||||||
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
|
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
|
||||||
smaDebug("vgId:%d, process rsma insert", SMA_VID(pSma));
|
|
||||||
|
|
||||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||||
SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat);
|
SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat);
|
||||||
|
@ -1036,7 +1049,7 @@ static void *tdRSmaPersistExec(void *param) {
|
||||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||||
qTaskInfo_t taskInfo = pRSmaInfo->items[i].taskInfo;
|
qTaskInfo_t taskInfo = pRSmaInfo->items[i].taskInfo;
|
||||||
if (!taskInfo) {
|
if (!taskInfo) {
|
||||||
smaDebug("vgId:%d, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1);
|
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1044,27 +1057,20 @@ static void *tdRSmaPersistExec(void *param) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
int8_t type = (int8_t)(i + 1);
|
int8_t type = (int8_t)(i + 1);
|
||||||
if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) {
|
if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) {
|
||||||
smaError("vgId:%d, table %" PRIi64 " level %d serialize rsma task failed since %s", vid, pRSmaInfo->suid, i + 1,
|
smaError("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo failed since %s", vid, pRSmaInfo->suid,
|
||||||
terrstr(terrno));
|
i + 1, terrstr(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
if (!pOutput || len <= 0) {
|
if (!pOutput || len <= 0) {
|
||||||
smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success but no output(len %d), not persist",
|
smaDebug("vgId:%d, rsma, table %" PRIi64
|
||||||
|
" level %d serialize qTaskInfo success but no output(len %d), not persist",
|
||||||
vid, pRSmaInfo->suid, i + 1, len);
|
vid, pRSmaInfo->suid, i + 1, len);
|
||||||
taosMemoryFreeClear(pOutput);
|
taosMemoryFreeClear(pOutput);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success with len %d, need persist", vid,
|
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo success with len %d, need persist", vid,
|
||||||
pRSmaInfo->suid, i + 1, len);
|
pRSmaInfo->suid, i + 1, len);
|
||||||
#if 0
|
|
||||||
if (qDeserializeTaskStatus(taskInfo, pOutput, len) < 0) {
|
|
||||||
smaError("vgId:%d, table %" PRIi64 "level %d deserialize rsma task failed since %s", vid, pRSmaInfo->suid,
|
|
||||||
i + 1, terrstr(terrno));
|
|
||||||
} else {
|
|
||||||
smaDebug("vgId:%d, table %" PRIi64 " level %d deserialize rsma task success", vid, pRSmaInfo->suid, i + 1);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (!isFileCreated) {
|
if (!isFileCreated) {
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
||||||
|
@ -1084,11 +1090,11 @@ static void *tdRSmaPersistExec(void *param) {
|
||||||
|
|
||||||
ASSERT(headLen <= RSMA_QTASKINFO_HEAD_LEN);
|
ASSERT(headLen <= RSMA_QTASKINFO_HEAD_LEN);
|
||||||
tdAppendTFile(&tFile, (void *)&tmpBuf, headLen, &toffset);
|
tdAppendTFile(&tFile, (void *)&tmpBuf, headLen, &toffset);
|
||||||
smaDebug("vgId:%d, table %" PRIi64 " level %d head part(len:%d) appended to offset:%" PRIi64, vid,
|
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d head part(len:%d) appended to offset:%" PRIi64, vid,
|
||||||
pRSmaInfo->suid, i + 1, headLen, toffset);
|
pRSmaInfo->suid, i + 1, headLen, toffset);
|
||||||
tdAppendTFile(&tFile, pOutput, len, &toffset);
|
tdAppendTFile(&tFile, pOutput, len, &toffset);
|
||||||
smaDebug("vgId:%d, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid, pRSmaInfo->suid,
|
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid,
|
||||||
i + 1, len, toffset);
|
pRSmaInfo->suid, i + 1, len, toffset);
|
||||||
|
|
||||||
taosMemoryFree(pOutput);
|
taosMemoryFree(pOutput);
|
||||||
}
|
}
|
||||||
|
@ -1098,13 +1104,13 @@ static void *tdRSmaPersistExec(void *param) {
|
||||||
_normal:
|
_normal:
|
||||||
if (isFileCreated) {
|
if (isFileCreated) {
|
||||||
if (tdUpdateTFileHeader(&tFile) < 0) {
|
if (tdUpdateTFileHeader(&tFile) < 0) {
|
||||||
smaError("vgId:%d, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
|
smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
tdCloseTFile(&tFile);
|
tdCloseTFile(&tFile);
|
||||||
tdRemoveTFile(&tFile);
|
tdRemoveTFile(&tFile);
|
||||||
goto _err;
|
goto _err;
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile));
|
smaDebug("vgId:%d, rsma, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile));
|
||||||
}
|
}
|
||||||
|
|
||||||
tdCloseTFile(&tFile);
|
tdCloseTFile(&tFile);
|
||||||
|
@ -1114,10 +1120,10 @@ _normal:
|
||||||
char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_F]);
|
char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_F]);
|
||||||
strncpy(pos, tdQTaskInfoFname[TD_QTASK_TMP_F], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName));
|
strncpy(pos, tdQTaskInfoFname[TD_QTASK_TMP_F], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName));
|
||||||
if (taosRenameFile(TD_TFILE_FULL_NAME(&tFile), newFName) != 0) {
|
if (taosRenameFile(TD_TFILE_FULL_NAME(&tFile), newFName) != 0) {
|
||||||
smaError("vgId:%d, failed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName);
|
smaError("vgId:%d, rsma, failed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName);
|
||||||
goto _err;
|
goto _err;
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, succeed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName);
|
smaDebug("vgId:%d, rsma, succeed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
goto _end;
|
goto _end;
|
||||||
|
@ -1129,13 +1135,13 @@ _end:
|
||||||
if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
||||||
TASK_TRIGGER_STAT_INACTIVE,
|
TASK_TRIGGER_STAT_INACTIVE,
|
||||||
TASK_TRIGGER_STAT_ACTIVE)) {
|
TASK_TRIGGER_STAT_ACTIVE)) {
|
||||||
smaDebug("vgId:%d, persist task is active again", vid);
|
smaDebug("vgId:%d, rsma persist task is active again", vid);
|
||||||
} else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
} else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
|
||||||
TASK_TRIGGER_STAT_CANCELLED,
|
TASK_TRIGGER_STAT_CANCELLED,
|
||||||
TASK_TRIGGER_STAT_FINISHED)) {
|
TASK_TRIGGER_STAT_FINISHED)) {
|
||||||
smaDebug("vgId:%d, persist task is cancelled", vid);
|
smaDebug("vgId:%d, rsma persist task is cancelled", vid);
|
||||||
} else {
|
} else {
|
||||||
smaWarn("vgId:%d, persist task in abnormal stat %" PRIi8, vid, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)));
|
smaWarn("vgId:%d, rsma persist task in abnormal stat %" PRIi8, vid, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)));
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
|
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
|
||||||
|
@ -1179,9 +1185,8 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
|
||||||
*/
|
*/
|
||||||
static void tdRSmaPersistTrigger(void *param, void *tmrId) {
|
static void tdRSmaPersistTrigger(void *param, void *tmrId) {
|
||||||
SRSmaStat *rsmaStat = param;
|
SRSmaStat *rsmaStat = param;
|
||||||
int64_t refId = rsmaStat->refId;
|
SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, rsmaStat->refId);
|
||||||
|
|
||||||
SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, refId);
|
|
||||||
if (!pRSmaStat) {
|
if (!pRSmaStat) {
|
||||||
smaDebug("rsma persistence task not start since already destroyed");
|
smaDebug("rsma persistence task not start since already destroyed");
|
||||||
return;
|
return;
|
||||||
|
@ -1221,5 +1226,5 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
|
||||||
smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat);
|
smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat);
|
||||||
} break;
|
} break;
|
||||||
}
|
}
|
||||||
taosReleaseRef(smaMgmt.smaRef, refId);
|
taosReleaseRef(smaMgmt.smaRef, rsmaStat->refId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,20 @@ static int32_t invaildFuncParaValueErrMsg(char* pErrBuf, int32_t len, const char
|
||||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_VALUE, "Invalid parameter value : %s", pFuncName);
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_VALUE, "Invalid parameter value : %s", pFuncName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void static addDbPrecisonParam(SNodeList** pList, uint8_t precision) {
|
||||||
|
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||||
|
pVal->literal = NULL;
|
||||||
|
pVal->isDuration = false;
|
||||||
|
pVal->translate = true;
|
||||||
|
pVal->node.resType.type = TSDB_DATA_TYPE_TINYINT;
|
||||||
|
pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TINYINT].bytes;
|
||||||
|
pVal->node.resType.precision = precision;
|
||||||
|
pVal->datum.i = (int64_t)precision;
|
||||||
|
pVal->typeData = (int64_t)precision;
|
||||||
|
|
||||||
|
nodesListMakeAppend(pList, (SNode*)pVal);
|
||||||
|
}
|
||||||
|
|
||||||
// There is only one parameter of numeric type, and the return type is parameter type
|
// There is only one parameter of numeric type, and the return type is parameter type
|
||||||
static int32_t translateInOutNum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateInOutNum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
|
@ -220,8 +234,20 @@ static int32_t translateWduration(SFunctionNode* pFunc, char* pErrBuf, int32_t l
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateNowToday(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
// pseudo column do not need to check parameters
|
||||||
|
|
||||||
|
//add database precision as param
|
||||||
|
uint8_t dbPrec = pFunc->node.resType.precision;
|
||||||
|
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP};
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateTimePseudoColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateTimePseudoColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
// pseudo column do not need to check parameters
|
// pseudo column do not need to check parameters
|
||||||
|
|
||||||
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP};
|
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1379,20 +1405,6 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void static addDbPrecisonParam(SNodeList* pList, uint8_t precision) {
|
|
||||||
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
|
||||||
pVal->literal = NULL;
|
|
||||||
pVal->isDuration = false;
|
|
||||||
pVal->translate = true;
|
|
||||||
pVal->node.resType.type = TSDB_DATA_TYPE_TINYINT;
|
|
||||||
pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TINYINT].bytes;
|
|
||||||
pVal->node.resType.precision = precision;
|
|
||||||
pVal->datum.i = (int64_t)precision;
|
|
||||||
pVal->typeData = (int64_t)precision;
|
|
||||||
|
|
||||||
nodesListAppend(pList, (SNode*)pVal);
|
|
||||||
}
|
|
||||||
|
|
||||||
void static addTimezoneParam(SNodeList* pList) {
|
void static addTimezoneParam(SNodeList* pList) {
|
||||||
char buf[6] = {0};
|
char buf[6] = {0};
|
||||||
time_t t = taosTime(NULL);
|
time_t t = taosTime(NULL);
|
||||||
|
@ -1460,6 +1472,10 @@ static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int
|
||||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//add database precision as param
|
||||||
|
uint8_t dbPrec = pFunc->node.resType.precision;
|
||||||
|
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
||||||
|
|
||||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1478,7 +1494,7 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_
|
||||||
|
|
||||||
//add database precision as param
|
//add database precision as param
|
||||||
uint8_t dbPrec = pFunc->node.resType.precision;
|
uint8_t dbPrec = pFunc->node.resType.precision;
|
||||||
addDbPrecisonParam(pFunc->pParameterList, dbPrec);
|
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
||||||
|
|
||||||
pFunc->node.resType =
|
pFunc->node.resType =
|
||||||
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP};
|
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP};
|
||||||
|
@ -1506,7 +1522,7 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
|
||||||
|
|
||||||
//add database precision as param
|
//add database precision as param
|
||||||
uint8_t dbPrec = pFunc->node.resType.precision;
|
uint8_t dbPrec = pFunc->node.resType.precision;
|
||||||
addDbPrecisonParam(pFunc->pParameterList, dbPrec);
|
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
|
||||||
|
|
||||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1934,6 +1950,19 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.name = "last_row",
|
.name = "last_row",
|
||||||
.type = FUNCTION_TYPE_LAST_ROW,
|
.type = FUNCTION_TYPE_LAST_ROW,
|
||||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||||
|
.translateFunc = translateFirstLast,
|
||||||
|
.getEnvFunc = getFirstLastFuncEnv,
|
||||||
|
.initFunc = functionSetup,
|
||||||
|
.processFunc = lastFunction,
|
||||||
|
.finalizeFunc = firstLastFinalize,
|
||||||
|
.pPartialFunc = "_last_partial",
|
||||||
|
.pMergeFunc = "_last_merge",
|
||||||
|
.combineFunc = lastCombine,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "_cache_last_row",
|
||||||
|
.type = FUNCTION_TYPE_CACHE_LAST_ROW,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||||
.translateFunc = translateLastRow,
|
.translateFunc = translateLastRow,
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
.initFunc = minmaxFunctionSetup,
|
.initFunc = minmaxFunctionSetup,
|
||||||
|
@ -2462,7 +2491,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.name = "now",
|
.name = "now",
|
||||||
.type = FUNCTION_TYPE_NOW,
|
.type = FUNCTION_TYPE_NOW,
|
||||||
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC,
|
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC,
|
||||||
.translateFunc = translateTimePseudoColumn,
|
.translateFunc = translateNowToday,
|
||||||
.getEnvFunc = NULL,
|
.getEnvFunc = NULL,
|
||||||
.initFunc = NULL,
|
.initFunc = NULL,
|
||||||
.sprocessFunc = nowFunction,
|
.sprocessFunc = nowFunction,
|
||||||
|
@ -2472,7 +2501,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.name = "today",
|
.name = "today",
|
||||||
.type = FUNCTION_TYPE_TODAY,
|
.type = FUNCTION_TYPE_TODAY,
|
||||||
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC,
|
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC,
|
||||||
.translateFunc = translateTimePseudoColumn,
|
.translateFunc = translateNowToday,
|
||||||
.getEnvFunc = NULL,
|
.getEnvFunc = NULL,
|
||||||
.initFunc = NULL,
|
.initFunc = NULL,
|
||||||
.sprocessFunc = todayFunction,
|
.sprocessFunc = todayFunction,
|
||||||
|
|
|
@ -4283,7 +4283,7 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
|
||||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||||
|
|
||||||
// TODO: process timeUnit for different db precisions
|
// TODO: process timeUnit for different db precisions
|
||||||
int32_t timeUnit = 1000;
|
int32_t timeUnit = 1;
|
||||||
if (pCtx->numOfParams == 5) { // TODO: param number incorrect
|
if (pCtx->numOfParams == 5) { // TODO: param number incorrect
|
||||||
timeUnit = pCtx->param[3].param.i;
|
timeUnit = pCtx->param[3].param.i;
|
||||||
}
|
}
|
||||||
|
|
|
@ -154,16 +154,12 @@ static int32_t createSelectRootLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
|
||||||
return createRootLogicNode(pCxt, pSelect, pSelect->precision, (FCreateLogicNode)func, pRoot);
|
return createRootLogicNode(pCxt, pSelect, pSelect->precision, (FCreateLogicNode)func, pRoot);
|
||||||
}
|
}
|
||||||
|
|
||||||
static EScanType getScanType(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNodeList* pScanPseudoCols,
|
static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols, SNodeList* pScanCols,
|
||||||
SNodeList* pScanCols, int8_t tableType) {
|
int8_t tableType) {
|
||||||
if (pCxt->pPlanCxt->topicQuery || pCxt->pPlanCxt->streamQuery) {
|
if (pCxt->pPlanCxt->topicQuery || pCxt->pPlanCxt->streamQuery) {
|
||||||
return SCAN_TYPE_STREAM;
|
return SCAN_TYPE_STREAM;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSelect->hasLastRowFunc) {
|
|
||||||
return SCAN_TYPE_LAST_ROW;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (NULL == pScanCols) {
|
if (NULL == pScanCols) {
|
||||||
// select count(*) from t
|
// select count(*) from t
|
||||||
return NULL == pScanPseudoCols
|
return NULL == pScanPseudoCols
|
||||||
|
@ -279,7 +275,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
||||||
code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM);
|
code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM);
|
||||||
}
|
}
|
||||||
|
|
||||||
pScan->scanType = getScanType(pCxt, pSelect, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType);
|
pScan->scanType = getScanType(pCxt, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType);
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = addPrimaryKeyCol(pScan->tableId, &pScan->pScanCols);
|
code = addPrimaryKeyCol(pScan->tableId, &pScan->pScanCols);
|
||||||
|
@ -474,6 +470,8 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pAgg->hasLastRow = pSelect->hasLastRowFunc;
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
// set grouyp keys, agg funcs and having conditions
|
// set grouyp keys, agg funcs and having conditions
|
||||||
|
|
|
@ -1622,6 +1622,46 @@ static int32_t rewriteUniqueOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
|
||||||
return rewriteUniqueOptimizeImpl(pCxt, pLogicSubplan, pIndef);
|
return rewriteUniqueOptimizeImpl(pCxt, pLogicSubplan, pIndef);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
|
||||||
|
if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || !(((SAggLogicNode*)pNode)->hasLastRow) ||
|
||||||
|
NULL != ((SAggLogicNode*)pNode)->pGroupKeys || 1 != LIST_LENGTH(pNode->pChildren) ||
|
||||||
|
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0)) ||
|
||||||
|
NULL != ((SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0))->node.pConditions) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pFunc = NULL;
|
||||||
|
FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) {
|
||||||
|
if (FUNCTION_TYPE_LAST_ROW != ((SFunctionNode*)pFunc)->funcType) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||||
|
SAggLogicNode* pAgg = (SAggLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, lastRowScanOptMayBeOptimized);
|
||||||
|
|
||||||
|
if (NULL == pAgg) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pNode = NULL;
|
||||||
|
FOREACH(pNode, pAgg->pAggFuncs) {
|
||||||
|
SFunctionNode* pFunc = (SFunctionNode*)pNode;
|
||||||
|
int32_t len = snprintf(pFunc->functionName, sizeof(pFunc->functionName), "_cache_last_row");
|
||||||
|
pFunc->functionName[len] = '\0';
|
||||||
|
fmGetFuncInfo(pFunc, NULL, 0);
|
||||||
|
}
|
||||||
|
pAgg->hasLastRow = false;
|
||||||
|
|
||||||
|
((SScanLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0))->scanType = SCAN_TYPE_LAST_ROW;
|
||||||
|
|
||||||
|
pCxt->optimized = true;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
// merge projects
|
// merge projects
|
||||||
static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) {
|
static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) {
|
||||||
if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) {
|
if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) {
|
||||||
|
@ -1710,7 +1750,8 @@ static const SOptimizeRule optimizeRuleSet[] = {
|
||||||
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
|
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
|
||||||
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
|
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
|
||||||
{.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize},
|
{.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize},
|
||||||
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize}
|
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize},
|
||||||
|
{.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize}
|
||||||
};
|
};
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
|
|
|
@ -1344,7 +1344,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pMergeLogicNode->pMergeKeys) {
|
||||||
code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys,
|
code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys,
|
||||||
&pMerge->pMergeKeys);
|
&pMerge->pMergeKeys);
|
||||||
}
|
}
|
||||||
|
|
|
@ -197,6 +197,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
|
||||||
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
|
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
|
||||||
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
||||||
return !(((SJoinLogicNode*)pNode)->isSingleTableJoin);
|
return !(((SJoinLogicNode*)pNode)->isSingleTableJoin);
|
||||||
|
case QUERY_NODE_LOGIC_PLAN_PARTITION:
|
||||||
|
return stbSplHasMultiTbScan(streamQuery, pNode);
|
||||||
case QUERY_NODE_LOGIC_PLAN_AGG:
|
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||||
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
|
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
|
||||||
case QUERY_NODE_LOGIC_PLAN_WINDOW:
|
case QUERY_NODE_LOGIC_PLAN_WINDOW:
|
||||||
|
@ -431,7 +433,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
|
||||||
SNodeList* pMergeKeys = NULL;
|
SNodeList* pMergeKeys = NULL;
|
||||||
code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
|
code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, false);
|
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
nodesDestroyList(pMergeKeys);
|
nodesDestroyList(pMergeKeys);
|
||||||
|
@ -887,6 +889,16 @@ static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||||
|
int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, NULL, pInfo->pSplitNode, true);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
||||||
|
(SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
|
||||||
|
}
|
||||||
|
++(pCxt->groupId);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
||||||
if (pCxt->pPlanCxt->rSmaQuery) {
|
if (pCxt->pPlanCxt->rSmaQuery) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -905,6 +917,9 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
||||||
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
||||||
code = stbSplSplitJoinNode(pCxt, &info);
|
code = stbSplSplitJoinNode(pCxt, &info);
|
||||||
break;
|
break;
|
||||||
|
case QUERY_NODE_LOGIC_PLAN_PARTITION:
|
||||||
|
code = stbSplSplitPartitionNode(pCxt, &info);
|
||||||
|
break;
|
||||||
case QUERY_NODE_LOGIC_PLAN_AGG:
|
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||||
code = stbSplSplitAggNode(pCxt, &info);
|
code = stbSplSplitAggNode(pCxt, &info);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -99,6 +99,8 @@ TEST_F(PlanBasicTest, lastRowFunc) {
|
||||||
run("SELECT LAST_ROW(c1, c2) FROM t1");
|
run("SELECT LAST_ROW(c1, c2) FROM t1");
|
||||||
|
|
||||||
run("SELECT LAST_ROW(c1) FROM st1");
|
run("SELECT LAST_ROW(c1) FROM st1");
|
||||||
|
|
||||||
|
run("SELECT LAST_ROW(c1), SUM(c3) FROM t1");
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(PlanBasicTest, sampleFunc) {
|
TEST_F(PlanBasicTest, sampleFunc) {
|
||||||
|
|
|
@ -1116,7 +1116,8 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
|
||||||
|
|
||||||
int32_t toUnixtimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t toUnixtimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
int32_t type = GET_PARAM_TYPE(pInput);
|
int32_t type = GET_PARAM_TYPE(pInput);
|
||||||
int32_t timePrec = GET_PARAM_PRECISON(pInput);
|
int64_t timePrec;
|
||||||
|
GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
|
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
|
||||||
if (colDataIsNull_s(pInput[0].columnData, i)) {
|
if (colDataIsNull_s(pInput[0].columnData, i)) {
|
||||||
|
@ -1514,7 +1515,10 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
int64_t ts = taosGetTimestamp(TSDB_TIME_PRECISION_MILLI);
|
int64_t timePrec;
|
||||||
|
GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[0]), pInput[0].columnData->pData);
|
||||||
|
|
||||||
|
int64_t ts = taosGetTimestamp(timePrec);
|
||||||
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||||
colDataAppendInt64(pOutput->columnData, i, &ts);
|
colDataAppendInt64(pOutput->columnData, i, &ts);
|
||||||
}
|
}
|
||||||
|
@ -1523,7 +1527,10 @@ int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
int64_t ts = taosGetTimestampToday(TSDB_TIME_PRECISION_MILLI);
|
int64_t timePrec;
|
||||||
|
GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[0]), pInput[0].columnData->pData);
|
||||||
|
|
||||||
|
int64_t ts = taosGetTimestampToday(timePrec);
|
||||||
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||||
colDataAppendInt64(pOutput->columnData, i, &ts);
|
colDataAppendInt64(pOutput->columnData, i, &ts);
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,15 +38,15 @@ class TDTestCase:
|
||||||
tdSql.query(f"select stateduration(col{i},'{j}',5) from test")
|
tdSql.query(f"select stateduration(col{i},'{j}',5) from test")
|
||||||
tdSql.checkRows(10)
|
tdSql.checkRows(10)
|
||||||
if j in ['LT' ,'lt','Lt','lT']:
|
if j in ['LT' ,'lt','Lt','lT']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
||||||
elif j in ['GT','gt', 'Gt','gT']:
|
elif j in ['GT','gt', 'Gt','gT']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (0,), (0,), (0,), (0,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)])
|
||||||
elif j in ['LE','le','Le','lE']:
|
elif j in ['LE','le','Le','lE']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
||||||
elif j in [ 'GE','ge','Ge','gE']:
|
elif j in [ 'GE','ge','Ge','gE']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (0,), (0,), (0,), (0,), (0,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)])
|
||||||
elif j in ['NE','ne','Ne','nE']:
|
elif j in ['NE','ne','Ne','nE']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (-1,), (0,), (0,), (0,), (0,), (0,)])
|
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)])
|
||||||
elif j in ['EQ','eq','Eq','eQ']:
|
elif j in ['EQ','eq','Eq','eQ']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
||||||
for i in float_list:
|
for i in float_list:
|
||||||
|
@ -54,11 +54,11 @@ class TDTestCase:
|
||||||
tdSql.query(f"select stateduration(col{i},'{j}',5) from test")
|
tdSql.query(f"select stateduration(col{i},'{j}',5) from test")
|
||||||
tdSql.checkRows(10)
|
tdSql.checkRows(10)
|
||||||
if j in ['LT','lt','Lt','lT','LE','le','Le','lE']:
|
if j in ['LT','lt','Lt','lT','LE','le','Le','lE']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
||||||
elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']:
|
elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (0,), (0,), (0,), (0,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)])
|
||||||
elif j in ['NE','ne','Ne','nE']:
|
elif j in ['NE','ne','Ne','nE']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (0,), (0,), (0,), (0,), (0,), (0,)])
|
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)])
|
||||||
elif j in ['EQ','eq','Eq','eQ']:
|
elif j in ['EQ','eq','Eq','eQ']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
||||||
|
|
||||||
|
@ -66,34 +66,34 @@ class TDTestCase:
|
||||||
for i in error_column_list:
|
for i in error_column_list:
|
||||||
for j in self.param_list:
|
for j in self.param_list:
|
||||||
tdSql.error(f"select stateduration({i},{j},5) from test")
|
tdSql.error(f"select stateduration({i},{j},5) from test")
|
||||||
|
|
||||||
error_param_list = ['a',1]
|
error_param_list = ['a',1]
|
||||||
for i in error_param_list:
|
for i in error_param_list:
|
||||||
tdSql.error(f"select stateduration(col1,{i},5) from test")
|
tdSql.error(f"select stateduration(col1,{i},5) from test")
|
||||||
|
|
||||||
# timestamp = 1s, time_unit =1s
|
# timestamp = 1s, time_unit =1s
|
||||||
tdSql.execute('''create table test1(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
|
tdSql.execute('''create table test1(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
|
||||||
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''')
|
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''')
|
||||||
for i in range(self.row_num):
|
for i in range(self.row_num):
|
||||||
tdSql.execute("insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
|
tdSql.execute("insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
|
||||||
% (self.ts + i*1000, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
|
% (self.ts + i*1000, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
|
||||||
|
|
||||||
for i in integer_list:
|
for i in integer_list:
|
||||||
for j in self.param_list:
|
for j in self.param_list:
|
||||||
tdSql.query(f"select stateduration(col{i},'{j}',5) from test1")
|
tdSql.query(f"select stateduration(col{i},'{j}',5) from test1")
|
||||||
tdSql.checkRows(10)
|
tdSql.checkRows(10)
|
||||||
# print(tdSql.queryResult)
|
# print(tdSql.queryResult)
|
||||||
if j in ['LT' ,'lt','Lt','lT']:
|
if j in ['LT' ,'lt','Lt','lT']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
||||||
elif j in ['GT','gt', 'Gt','gT']:
|
elif j in ['GT','gt', 'Gt','gT']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1000,), (2000,), (3000,), (4000,)])
|
||||||
elif j in ['LE','le','Le','lE']:
|
elif j in ['LE','le','Le','lE']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (4000,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
||||||
elif j in [ 'GE','ge','Ge','gE']:
|
elif j in [ 'GE','ge','Ge','gE']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1000,), (2000,), (3000,), (4000,), (5000,)])
|
||||||
elif j in ['NE','ne','Ne','nE']:
|
elif j in ['NE','ne','Ne','nE']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)])
|
tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (-1,), (0,), (1000,), (2000,), (3000,), (4000,)])
|
||||||
elif j in ['EQ','eq','Eq','eQ']:
|
elif j in ['EQ','eq','Eq','eQ']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
||||||
for i in float_list:
|
for i in float_list:
|
||||||
for j in self.param_list:
|
for j in self.param_list:
|
||||||
|
@ -101,22 +101,22 @@ class TDTestCase:
|
||||||
tdSql.checkRows(10)
|
tdSql.checkRows(10)
|
||||||
print(tdSql.queryResult)
|
print(tdSql.queryResult)
|
||||||
if j in ['LT','lt','Lt','lT','LE','le','Le','lE']:
|
if j in ['LT','lt','Lt','lT','LE','le','Le','lE']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (4000,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
||||||
elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']:
|
elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1000,), (2000,), (3000,), (4000,)])
|
||||||
elif j in ['NE','ne','Ne','nE']:
|
elif j in ['NE','ne','Ne','nE']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)])
|
tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (4000,), (5000,), (6000,), (7000,), (8000,), (9000,)])
|
||||||
elif j in ['EQ','eq','Eq','eQ']:
|
elif j in ['EQ','eq','Eq','eQ']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
||||||
|
|
||||||
|
|
||||||
# timestamp = 1m, time_unit =1m
|
# timestamp = 1m, time_unit =1m
|
||||||
tdSql.execute('''create table test2(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
|
tdSql.execute('''create table test2(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
|
||||||
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''')
|
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''')
|
||||||
for i in range(self.row_num):
|
for i in range(self.row_num):
|
||||||
tdSql.execute("insert into test2 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
|
tdSql.execute("insert into test2 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
|
||||||
% (self.ts + i*1000*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
|
% (self.ts + i*1000*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
|
||||||
|
|
||||||
for i in integer_list:
|
for i in integer_list:
|
||||||
for j in self.param_list:
|
for j in self.param_list:
|
||||||
tdSql.query(f"select stateduration(col{i},'{j}',5,1m) from test2")
|
tdSql.query(f"select stateduration(col{i},'{j}',5,1m) from test2")
|
||||||
|
@ -132,7 +132,7 @@ class TDTestCase:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)])
|
||||||
elif j in ['NE','ne','Ne','nE']:
|
elif j in ['NE','ne','Ne','nE']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)])
|
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)])
|
||||||
elif j in ['EQ','eq','Eq','eQ']:
|
elif j in ['EQ','eq','Eq','eQ']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
||||||
for i in float_list:
|
for i in float_list:
|
||||||
for j in self.param_list:
|
for j in self.param_list:
|
||||||
|
@ -147,14 +147,14 @@ class TDTestCase:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)])
|
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)])
|
||||||
elif j in ['EQ','eq','Eq','eQ']:
|
elif j in ['EQ','eq','Eq','eQ']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
||||||
|
|
||||||
# timestamp = 1h, time_unit =1h
|
# timestamp = 1h, time_unit =1h
|
||||||
tdSql.execute('''create table test3(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
|
tdSql.execute('''create table test3(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
|
||||||
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''')
|
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''')
|
||||||
for i in range(self.row_num):
|
for i in range(self.row_num):
|
||||||
tdSql.execute("insert into test3 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
|
tdSql.execute("insert into test3 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
|
||||||
% (self.ts + i*1000*60*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
|
% (self.ts + i*1000*60*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
|
||||||
|
|
||||||
for i in integer_list:
|
for i in integer_list:
|
||||||
for j in self.param_list:
|
for j in self.param_list:
|
||||||
tdSql.query(f"select stateduration(col{i},'{j}',5,1h) from test3")
|
tdSql.query(f"select stateduration(col{i},'{j}',5,1h) from test3")
|
||||||
|
@ -170,7 +170,7 @@ class TDTestCase:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)])
|
||||||
elif j in ['NE','ne','Ne','nE']:
|
elif j in ['NE','ne','Ne','nE']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)])
|
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)])
|
||||||
elif j in ['EQ','eq','Eq','eQ']:
|
elif j in ['EQ','eq','Eq','eQ']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
||||||
for i in float_list:
|
for i in float_list:
|
||||||
for j in self.param_list:
|
for j in self.param_list:
|
||||||
|
@ -202,7 +202,7 @@ class TDTestCase:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (60,), (120,), (180,), (240,), (300,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (60,), (120,), (180,), (240,), (300,)])
|
||||||
elif j in ['NE','ne','Ne','nE']:
|
elif j in ['NE','ne','Ne','nE']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(0,), (60,), (120,), (180,), (-1,), (0,), (60,), (120,), (180,), (240,)])
|
tdSql.checkEqual(tdSql.queryResult,[(0,), (60,), (120,), (180,), (-1,), (0,), (60,), (120,), (180,), (240,)])
|
||||||
elif j in ['EQ','eq','Eq','eQ']:
|
elif j in ['EQ','eq','Eq','eQ']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
||||||
for i in float_list:
|
for i in float_list:
|
||||||
for j in self.param_list:
|
for j in self.param_list:
|
||||||
|
@ -219,13 +219,13 @@ class TDTestCase:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
||||||
|
|
||||||
# for stb
|
# for stb
|
||||||
tdSql.execute('''create table stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
|
tdSql.execute('''create table stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
|
||||||
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(t0 int)''')
|
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(t0 int)''')
|
||||||
tdSql.execute('create table stb_1 using stb tags(1)')
|
tdSql.execute('create table stb_1 using stb tags(1)')
|
||||||
for i in range(self.row_num):
|
for i in range(self.row_num):
|
||||||
tdSql.execute("insert into stb_1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
|
tdSql.execute("insert into stb_1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
|
||||||
% (self.ts + i*1000*60*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
|
% (self.ts + i*1000*60*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
|
||||||
|
|
||||||
for i in integer_list:
|
for i in integer_list:
|
||||||
for j in self.param_list:
|
for j in self.param_list:
|
||||||
tdSql.query(f"select stateduration(col{i},'{j}',5,1h) from stb")
|
tdSql.query(f"select stateduration(col{i},'{j}',5,1h) from stb")
|
||||||
|
@ -241,7 +241,7 @@ class TDTestCase:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)])
|
||||||
elif j in ['NE','ne','Ne','nE']:
|
elif j in ['NE','ne','Ne','nE']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)])
|
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)])
|
||||||
elif j in ['EQ','eq','Eq','eQ']:
|
elif j in ['EQ','eq','Eq','eQ']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
||||||
for i in float_list:
|
for i in float_list:
|
||||||
for j in self.param_list:
|
for j in self.param_list:
|
||||||
|
@ -256,10 +256,10 @@ class TDTestCase:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)])
|
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)])
|
||||||
elif j in ['EQ','eq','Eq','eQ']:
|
elif j in ['EQ','eq','Eq','eQ']:
|
||||||
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
tdLog.success("%s successfully executed" % __file__)
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
tdCases.addWindows(__file__, TDTestCase())
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
tdCases.addLinux(__file__, TDTestCase())
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
|
|
@ -9,28 +9,22 @@ from util.sql import *
|
||||||
from util.cases import *
|
from util.cases import *
|
||||||
from util.dnodes import TDDnodes
|
from util.dnodes import TDDnodes
|
||||||
from util.dnodes import TDDnode
|
from util.dnodes import TDDnode
|
||||||
|
from util.cluster import *
|
||||||
|
|
||||||
import time
|
import time
|
||||||
import socket
|
import socket
|
||||||
import subprocess
|
import subprocess
|
||||||
|
sys.path.append("./6-cluster")
|
||||||
|
|
||||||
class MyDnodes(TDDnodes):
|
from clusterCommonCreate import *
|
||||||
def __init__(self ,dnodes_lists):
|
from clusterCommonCheck import *
|
||||||
super(MyDnodes,self).__init__()
|
|
||||||
self.dnodes = dnodes_lists # dnode must be TDDnode instance
|
|
||||||
self.simDeployed = False
|
|
||||||
|
|
||||||
class TDTestCase:
|
class TDTestCase:
|
||||||
noConn = True
|
|
||||||
|
|
||||||
def init(self,conn ,logSql):
|
def init(self,conn ,logSql):
|
||||||
tdLog.debug(f"start to excute {__file__}")
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
self.TDDnodes = None
|
tdSql.init(conn.cursor())
|
||||||
self.depoly_cluster(5)
|
self.host = socket.gethostname()
|
||||||
self.master_dnode = self.TDDnodes.dnodes[0]
|
|
||||||
self.host=self.master_dnode.cfgDict["fqdn"]
|
|
||||||
conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir)
|
|
||||||
tdSql.init(conn1.cursor())
|
|
||||||
|
|
||||||
|
|
||||||
def getBuildPath(self):
|
def getBuildPath(self):
|
||||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
@ -41,90 +35,12 @@ class TDTestCase:
|
||||||
projPath = selfPath[:selfPath.find("tests")]
|
projPath = selfPath[:selfPath.find("tests")]
|
||||||
|
|
||||||
for root, dirs, files in os.walk(projPath):
|
for root, dirs, files in os.walk(projPath):
|
||||||
if ("taosd" in files or "taosd.exe" in files):
|
if ("taosd" in files):
|
||||||
rootRealPath = os.path.dirname(os.path.realpath(root))
|
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||||
if ("packaging" not in rootRealPath):
|
if ("packaging" not in rootRealPath):
|
||||||
buildPath = root[:len(root) - len("/build/bin")]
|
buildPath = root[:len(root) - len("/build/bin")]
|
||||||
break
|
break
|
||||||
return buildPath
|
return buildPath
|
||||||
|
|
||||||
|
|
||||||
def depoly_cluster(self ,dnodes_nums):
|
|
||||||
|
|
||||||
testCluster = False
|
|
||||||
valgrind = 0
|
|
||||||
hostname = socket.gethostname()
|
|
||||||
dnodes = []
|
|
||||||
start_port = 6030
|
|
||||||
start_port_sec = 6130
|
|
||||||
|
|
||||||
for num in range(1, dnodes_nums+1):
|
|
||||||
dnode = TDDnode(num)
|
|
||||||
dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}")
|
|
||||||
dnode.addExtraCfg("fqdn", f"{hostname}")
|
|
||||||
dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}")
|
|
||||||
dnode.addExtraCfg("monitorFqdn", hostname)
|
|
||||||
dnode.addExtraCfg("monitorPort", 7043)
|
|
||||||
dnode.addExtraCfg("secondEp", f"{hostname}:{start_port_sec}")
|
|
||||||
|
|
||||||
dnodes.append(dnode)
|
|
||||||
|
|
||||||
self.TDDnodes = MyDnodes(dnodes)
|
|
||||||
self.TDDnodes.init("")
|
|
||||||
self.TDDnodes.setTestCluster(testCluster)
|
|
||||||
self.TDDnodes.setValgrind(valgrind)
|
|
||||||
self.TDDnodes.stopAll()
|
|
||||||
for dnode in self.TDDnodes.dnodes:
|
|
||||||
self.TDDnodes.deploy(dnode.index,{})
|
|
||||||
|
|
||||||
for dnode in self.TDDnodes.dnodes:
|
|
||||||
self.TDDnodes.starttaosd(dnode.index)
|
|
||||||
|
|
||||||
# create cluster
|
|
||||||
for dnode in self.TDDnodes.dnodes[1:]:
|
|
||||||
# print(dnode.cfgDict)
|
|
||||||
dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"]
|
|
||||||
dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0]
|
|
||||||
dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1]
|
|
||||||
cmd = f"{self.getBuildPath()}/build/bin/taos -h {dnode_first_host} -P {dnode_first_port} -s \"create dnode \\\"{dnode_id}\\\"\""
|
|
||||||
print(cmd)
|
|
||||||
os.system(cmd)
|
|
||||||
|
|
||||||
time.sleep(2)
|
|
||||||
tdLog.info(" create cluster done! ")
|
|
||||||
|
|
||||||
def five_dnode_one_mnode(self):
|
|
||||||
tdSql.query("show dnodes;")
|
|
||||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
|
||||||
tdSql.checkData(4,1,'%s:6430'%self.host)
|
|
||||||
tdSql.checkData(0,4,'ready')
|
|
||||||
tdSql.checkData(4,4,'ready')
|
|
||||||
tdSql.query("show mnodes;")
|
|
||||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
|
||||||
tdSql.checkData(0,2,'leader')
|
|
||||||
tdSql.checkData(0,3,'ready')
|
|
||||||
|
|
||||||
|
|
||||||
tdSql.error("create mnode on dnode 1;")
|
|
||||||
tdSql.error("drop mnode on dnode 1;")
|
|
||||||
|
|
||||||
tdSql.execute("drop database if exists db")
|
|
||||||
tdSql.execute("create database if not exists db replica 1 duration 300")
|
|
||||||
tdSql.execute("use db")
|
|
||||||
tdSql.execute(
|
|
||||||
'''create table stb1
|
|
||||||
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
|
|
||||||
tags (t1 int)
|
|
||||||
'''
|
|
||||||
)
|
|
||||||
tdSql.execute(
|
|
||||||
'''
|
|
||||||
create table t1
|
|
||||||
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
|
|
||||||
'''
|
|
||||||
)
|
|
||||||
for i in range(4):
|
|
||||||
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
|
|
||||||
|
|
||||||
def five_dnode_two_mnode(self):
|
def five_dnode_two_mnode(self):
|
||||||
tdSql.query("show dnodes;")
|
tdSql.query("show dnodes;")
|
||||||
|
@ -187,6 +103,34 @@ class TDTestCase:
|
||||||
tdSql.error("create mnode on dnode 2")
|
tdSql.error("create mnode on dnode 2")
|
||||||
tdSql.query("show dnodes;")
|
tdSql.query("show dnodes;")
|
||||||
print(tdSql.queryResult)
|
print(tdSql.queryResult)
|
||||||
|
clusterComCheck.checkDnodes(5)
|
||||||
|
# restart all taosd
|
||||||
|
tdDnodes=cluster.dnodes
|
||||||
|
|
||||||
|
# stop follower
|
||||||
|
tdLog.info("stop follower")
|
||||||
|
tdDnodes[1].stoptaosd()
|
||||||
|
if cluster.checkConnectStatus(0) :
|
||||||
|
print("cluster also work")
|
||||||
|
|
||||||
|
# start follower
|
||||||
|
tdLog.info("start follower")
|
||||||
|
tdDnodes[1].starttaosd()
|
||||||
|
if clusterComCheck.checkMnodeStatus(2) :
|
||||||
|
print("both mnodes are ready")
|
||||||
|
|
||||||
|
# stop leader
|
||||||
|
tdLog.info("stop leader")
|
||||||
|
tdDnodes[0].stoptaosd()
|
||||||
|
try:
|
||||||
|
cluster.checkConnectStatus(2)
|
||||||
|
tdLog.exit(" The election still succeeds when leader of both mnodes are killed ")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
tdLog.info("start leader")
|
||||||
|
tdDnodes[0].starttaosd()
|
||||||
|
if clusterComCheck.checkMnodeStatus(2) :
|
||||||
|
print("both mnodes are ready")
|
||||||
|
|
||||||
# # fisrt drop follower of mnode
|
# # fisrt drop follower of mnode
|
||||||
# BUG
|
# BUG
|
||||||
|
@ -229,8 +173,6 @@ class TDTestCase:
|
||||||
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
print(self.master_dnode.cfgDict)
|
|
||||||
self.five_dnode_one_mnode()
|
|
||||||
self.five_dnode_two_mnode()
|
self.five_dnode_two_mnode()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -88,7 +88,7 @@ class TDTestCase:
|
||||||
tdLog.info("Confirm the status of the dnode again")
|
tdLog.info("Confirm the status of the dnode again")
|
||||||
tdSql.error("create mnode on dnode 2")
|
tdSql.error("create mnode on dnode 2")
|
||||||
tdSql.query("show dnodes;")
|
tdSql.query("show dnodes;")
|
||||||
print(tdSql.queryResult)
|
# print(tdSql.queryResult)
|
||||||
clusterComCheck.checkDnodes(dnodenumbers)
|
clusterComCheck.checkDnodes(dnodenumbers)
|
||||||
# restart all taosd
|
# restart all taosd
|
||||||
tdDnodes=cluster.dnodes
|
tdDnodes=cluster.dnodes
|
||||||
|
@ -108,18 +108,6 @@ class TDTestCase:
|
||||||
tdDnodes[0].starttaosd()
|
tdDnodes[0].starttaosd()
|
||||||
clusterComCheck.checkMnodeStatus(3)
|
clusterComCheck.checkMnodeStatus(3)
|
||||||
|
|
||||||
tdLog.info("Take turns stopping all dnodes ")
|
|
||||||
# seperate vnode and mnode in different dnodes.
|
|
||||||
# create database and stable
|
|
||||||
stopcount =0
|
|
||||||
while stopcount <= 2:
|
|
||||||
tdLog.info("first restart loop")
|
|
||||||
for i in range(dnodenumbers):
|
|
||||||
tdDnodes[i].stoptaosd()
|
|
||||||
tdDnodes[i].starttaosd()
|
|
||||||
stopcount+=1
|
|
||||||
clusterComCheck.checkDnodes(dnodenumbers)
|
|
||||||
clusterComCheck.checkMnodeStatus(3)
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
# print(self.master_dnode.cfgDict)
|
# print(self.master_dnode.cfgDict)
|
||||||
|
|
|
@ -0,0 +1,118 @@
|
||||||
|
from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE
|
||||||
|
import taos
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import os
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
from util.dnodes import TDDnodes
|
||||||
|
from util.dnodes import TDDnode
|
||||||
|
from util.cluster import *
|
||||||
|
from test import tdDnodes
|
||||||
|
sys.path.append("./6-cluster")
|
||||||
|
|
||||||
|
from clusterCommonCreate import *
|
||||||
|
from clusterCommonCheck import *
|
||||||
|
import time
|
||||||
|
import socket
|
||||||
|
import subprocess
|
||||||
|
from multiprocessing import Process
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
|
||||||
|
def init(self,conn ,logSql):
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
self.host = socket.gethostname()
|
||||||
|
|
||||||
|
|
||||||
|
def getBuildPath(self):
|
||||||
|
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
|
if ("community" in selfPath):
|
||||||
|
projPath = selfPath[:selfPath.find("community")]
|
||||||
|
else:
|
||||||
|
projPath = selfPath[:selfPath.find("tests")]
|
||||||
|
|
||||||
|
for root, dirs, files in os.walk(projPath):
|
||||||
|
if ("taosd" in files):
|
||||||
|
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||||
|
if ("packaging" not in rootRealPath):
|
||||||
|
buildPath = root[:len(root) - len("/build/bin")]
|
||||||
|
break
|
||||||
|
return buildPath
|
||||||
|
|
||||||
|
def fiveDnodeThreeMnode(self,dnodenumbers,mnodeNums,restartNumber):
|
||||||
|
tdLog.printNoPrefix("======== test case 1: ")
|
||||||
|
paraDict = {'dbName': 'db',
|
||||||
|
'dropFlag': 1,
|
||||||
|
'event': '',
|
||||||
|
'vgroups': 4,
|
||||||
|
'replica': 1,
|
||||||
|
'stbName': 'stb',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||||
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbNum': 1,
|
||||||
|
'rowsPerTbl': 10000,
|
||||||
|
'batchNum': 10,
|
||||||
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
|
'pollDelay': 10,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1}
|
||||||
|
dnodenumbers=int(dnodenumbers)
|
||||||
|
mnodeNums=int(mnodeNums)
|
||||||
|
dbNumbers = int(dnodenumbers * restartNumber)
|
||||||
|
|
||||||
|
tdLog.info("first check dnode and mnode")
|
||||||
|
tdSql.query("show dnodes;")
|
||||||
|
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||||
|
tdSql.checkData(4,1,'%s:6430'%self.host)
|
||||||
|
clusterComCheck.checkDnodes(dnodenumbers)
|
||||||
|
clusterComCheck.checkMnodeStatus(1)
|
||||||
|
|
||||||
|
# fisr add three mnodes;
|
||||||
|
tdLog.info("fisr add three mnodes and check mnode status")
|
||||||
|
tdSql.execute("create mnode on dnode 2")
|
||||||
|
clusterComCheck.checkMnodeStatus(2)
|
||||||
|
tdSql.execute("create mnode on dnode 3")
|
||||||
|
clusterComCheck.checkMnodeStatus(3)
|
||||||
|
|
||||||
|
# add some error operations and
|
||||||
|
tdLog.info("Confirm the status of the dnode again")
|
||||||
|
tdSql.error("create mnode on dnode 2")
|
||||||
|
tdSql.query("show dnodes;")
|
||||||
|
# print(tdSql.queryResult)
|
||||||
|
clusterComCheck.checkDnodes(dnodenumbers)
|
||||||
|
# restart all taosd
|
||||||
|
tdDnodes=cluster.dnodes
|
||||||
|
|
||||||
|
tdLog.info("Take turns stopping all dnodes ")
|
||||||
|
# seperate vnode and mnode in different dnodes.
|
||||||
|
# create database and stable
|
||||||
|
stopcount =0
|
||||||
|
while stopcount <= 2:
|
||||||
|
tdLog.info(" restart loop: %d"%stopcount )
|
||||||
|
for i in range(dnodenumbers):
|
||||||
|
tdDnodes[i].stoptaosd()
|
||||||
|
tdDnodes[i].starttaosd()
|
||||||
|
stopcount+=1
|
||||||
|
clusterComCheck.checkDnodes(dnodenumbers)
|
||||||
|
clusterComCheck.checkMnodeStatus(3)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
# print(self.master_dnode.cfgDict)
|
||||||
|
self.fiveDnodeThreeMnode(5,3,1)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -116,17 +116,18 @@ python3 ./test.py -f 2-query/function_null.py
|
||||||
python3 ./test.py -f 2-query/queryQnode.py
|
python3 ./test.py -f 2-query/queryQnode.py
|
||||||
|
|
||||||
python3 ./test.py -f 6-cluster/5dnode1mnode.py
|
python3 ./test.py -f 6-cluster/5dnode1mnode.py
|
||||||
python3 ./test.py -f 6-cluster/5dnode2mnode.py
|
python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
|
||||||
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
|
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
|
||||||
|
python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3
|
||||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
|
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
|
||||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
|
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
|
||||||
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3
|
# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3
|
||||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3
|
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3
|
||||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3
|
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3
|
||||||
# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3
|
# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3
|
||||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
|
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
|
||||||
# python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
|
# python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
|
||||||
python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3
|
# python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3
|
||||||
|
|
||||||
|
|
||||||
python3 ./test.py -f 7-tmq/basic5.py
|
python3 ./test.py -f 7-tmq/basic5.py
|
||||||
|
|
Loading…
Reference in New Issue