Merge remote-tracking branch 'origin/3.0' into enh/opt-transport
This commit is contained in:
commit
e94a4a6d1e
|
@ -7,7 +7,17 @@ ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD}
|
||||||
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/
|
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/
|
||||||
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/log/
|
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/log/
|
||||||
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/
|
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/
|
||||||
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
COMMAND ${CMAKE_COMMAND} -E echo firstEp localhost:6030 > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||||
|
COMMAND ${CMAKE_COMMAND} -E echo fqdn localhost >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||||
|
COMMAND ${CMAKE_COMMAND} -E echo serverPort 6030 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||||
|
COMMAND ${CMAKE_COMMAND} -E echo debugFlag 135 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||||
|
COMMAND ${CMAKE_COMMAND} -E echo asyncLog 0 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||||
|
COMMAND ${CMAKE_COMMAND} -E echo supportVnodes 1024 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||||
|
COMMAND ${CMAKE_COMMAND} -E echo numOfLogLines 300000000 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||||
|
COMMAND ${CMAKE_COMMAND} -E echo logKeepDays -1 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||||
|
COMMAND ${CMAKE_COMMAND} -E echo checkpointInterval 60 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||||
|
COMMAND ${CMAKE_COMMAND} -E echo snodeAddress 127.0.0.1:873 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||||
|
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||||
COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||||
COMMAND ${CMAKE_COMMAND} -E echo charset UTF-8 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
COMMAND ${CMAKE_COMMAND} -E echo charset UTF-8 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||||
COMMAND ${CMAKE_COMMAND} -E echo monitor 0 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
COMMAND ${CMAKE_COMMAND} -E echo monitor 0 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||||
|
|
|
@ -208,3 +208,15 @@ CSV 文件中的每个 Row 配置一个 OPC 数据点位。Row 的规则如下
|
||||||
### 8. 创建完成
|
### 8. 创建完成
|
||||||
|
|
||||||
点击 **提交** 按钮,完成创建 OPC UA 到 TDengine 的数据同步任务,回到**数据源列表**页面可查看任务执行情况。
|
点击 **提交** 按钮,完成创建 OPC UA 到 TDengine 的数据同步任务,回到**数据源列表**页面可查看任务执行情况。
|
||||||
|
|
||||||
|
## 增加数据点位
|
||||||
|
|
||||||
|
在任务运行中,点击 **编辑**,点击 **增加数据点位** 按钮,追加数据点位到 CSV 文件中。
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
在弹出的表单中,填写数据点位的信息。
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
点击 **确定** 按钮,完成数据点位的追加。
|
|
@ -182,3 +182,15 @@ CSV 文件中的每个 Row 配置一个 OPC 数据点位。Row 的规则如下
|
||||||
### 7. 创建完成
|
### 7. 创建完成
|
||||||
|
|
||||||
点击 **提交** 按钮,完成创建 OPC DA 到 TDengine 的数据同步任务,回到**数据源列表**页面可查看任务执行情况。
|
点击 **提交** 按钮,完成创建 OPC DA 到 TDengine 的数据同步任务,回到**数据源列表**页面可查看任务执行情况。
|
||||||
|
|
||||||
|
## 增加数据点位
|
||||||
|
|
||||||
|
在任务运行中,点击 **编辑**,点击 **增加数据点位** 按钮,追加数据点位到 CSV 文件中。
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
在弹出的表单中,填写数据点位的信息。
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
点击 **确定** 按钮,完成数据点位的追加。
|
Binary file not shown.
After Width: | Height: | Size: 175 KiB |
Binary file not shown.
After Width: | Height: | Size: 67 KiB |
|
@ -522,6 +522,6 @@ Offset 结构体提供了获取当前消息所属的数据库,主题和分区
|
||||||
|
|
||||||
## 附录
|
## 附录
|
||||||
- Rust 连接器文档:https://docs.rs/taos
|
- Rust 连接器文档:https://docs.rs/taos
|
||||||
- Rust 连接器项目地址: https://github.com/taosdata/rust-connector-taos
|
- Rust 连接器项目地址: https://github.com/taosdata/taos-connector-rust
|
||||||
- deadpool 连接池: https://crates.io/crates/deadpool
|
- deadpool 连接池: https://crates.io/crates/deadpool
|
||||||
- r2d2 连接池: https://crates.io/crates/r2d2
|
- r2d2 连接池: https://crates.io/crates/r2d2
|
||||||
|
|
|
@ -153,7 +153,6 @@ char *tTagValToData(const STagVal *pTagVal, bool isJson);
|
||||||
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
|
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
|
||||||
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
|
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
|
||||||
int32_t tTagToValArray(const STag *pTag, SArray **ppArray);
|
int32_t tTagToValArray(const STag *pTag, SArray **ppArray);
|
||||||
void tTagSetCid(const STag *pTag, int16_t iTag, int16_t cid);
|
|
||||||
void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove
|
void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove
|
||||||
int32_t parseJsontoTagData(const char *json, SArray *pTagVals, STag **ppTag, void *pMsgBuf);
|
int32_t parseJsontoTagData(const char *json, SArray *pTagVals, STag **ppTag, void *pMsgBuf);
|
||||||
|
|
||||||
|
|
|
@ -116,23 +116,14 @@ function install_bin() {
|
||||||
${csudo}cp -r ${script_dir}/bin/* ${install_main_dir}/bin && ${csudo}chmod 0555 ${install_main_dir}/bin/*
|
${csudo}cp -r ${script_dir}/bin/* ${install_main_dir}/bin && ${csudo}chmod 0555 ${install_main_dir}/bin/*
|
||||||
|
|
||||||
#Make link
|
#Make link
|
||||||
[ -x ${install_main_dir}/bin/${clientName} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName} ${bin_link_dir}/${clientName} || :
|
[ -x ${install_main_dir}/bin/${clientName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName2} ${bin_link_dir}/${clientName2} || :
|
||||||
if [ "$osType" != "Darwin" ]; then
|
if [ "$osType" != "Darwin" ]; then
|
||||||
[ -x ${install_main_dir}/bin/${demoName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${demoName2} ${bin_link_dir}/${demoName2} || :
|
[ -x ${install_main_dir}/bin/${demoName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${demoName2} ${bin_link_dir}/${demoName2} || :
|
||||||
fi
|
fi
|
||||||
[ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/${uninstallScript} || :
|
[ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/${uninstallScript} || :
|
||||||
[ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo}ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || :
|
[ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo}ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || :
|
||||||
|
[ -x ${install_main_dir}/bin/${benchmarkName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName2} ${bin_link_dir}/${benchmarkName2} || :
|
||||||
if [ "$verMode" == "cluster" ] && [ "$clientName" != "$clientName2" ]; then
|
[ -x ${install_main_dir}/bin/${dumpName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${dumpName2} ${bin_link_dir}/${dumpName2} || :
|
||||||
#Make link
|
|
||||||
[ -x ${install_main_dir}/bin/${clientName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName2} ${bin_link_dir}/${clientName2} || :
|
|
||||||
if [ "$osType" != "Darwin" ]; then
|
|
||||||
[ -x ${install_main_dir}/bin/${demoName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${demoName2} ${bin_link_dir}/${demoName2} || :
|
|
||||||
[ -x ${install_main_dir}/bin/${benchmarkName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName2} ${bin_link_dir}/${benchmarkName2} || :
|
|
||||||
[ -x ${install_main_dir}/bin/${dumpName2} ] && ${csudo}ln -s ${install_main_dir}/bin/${dumpName2} ${bin_link_dir}/${dumpName2} || :
|
|
||||||
fi
|
|
||||||
[ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo}ln -sf ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/${uninstallScript2} || :
|
|
||||||
fi
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function clean_lib() {
|
function clean_lib() {
|
||||||
|
|
|
@ -1120,7 +1120,8 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
SRequestObj* pRequest = NULL;
|
SRequestObj* pRequest = NULL;
|
||||||
SQuery* pQuery = NULL;
|
SQuery* pQuery = NULL;
|
||||||
SHashObj* pVgroupHashmap = NULL;
|
SHashObj* pVgroupHashmap = NULL;
|
||||||
|
SArray* pTagList = taosArrayInit(0, POINTER_BYTES);
|
||||||
|
RAW_NULL_CHECK(pTagList);
|
||||||
RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
|
RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
|
||||||
uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
|
uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
|
||||||
|
|
||||||
|
@ -1186,6 +1187,14 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
}
|
}
|
||||||
pCreateReq->ctb.suid = pTableMeta->uid;
|
pCreateReq->ctb.suid = pTableMeta->uid;
|
||||||
|
|
||||||
|
SArray* pTagVals = NULL;
|
||||||
|
code = tTagToValArray((STag *)pCreateReq->ctb.pTag, &pTagVals);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosMemoryFreeClear(pTableMeta);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool rebuildTag = false;
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
|
||||||
char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
|
char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
|
||||||
if (tName == NULL) {
|
if (tName == NULL) {
|
||||||
|
@ -1195,11 +1204,34 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
|
j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
|
||||||
SSchema* tag = &pTableMeta->schema[j];
|
SSchema* tag = &pTableMeta->schema[j];
|
||||||
if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
|
if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
|
||||||
tTagSetCid((STag*)pCreateReq->ctb.pTag, i, tag->colId);
|
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
|
||||||
|
if (pTagVal) {
|
||||||
|
if (pTagVal->cid != tag->colId){
|
||||||
|
pTagVal->cid = tag->colId;
|
||||||
|
rebuildTag = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name, (int)taosArrayGetSize(pTagVals), i, tag->colId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosMemoryFreeClear(pTableMeta);
|
taosMemoryFreeClear(pTableMeta);
|
||||||
|
if (rebuildTag){
|
||||||
|
STag* ppTag = NULL;
|
||||||
|
code = tTagNew(pTagVals, 1, false, &ppTag);
|
||||||
|
taosArrayDestroy(pTagVals);
|
||||||
|
pTagVals = NULL;
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
if (NULL == taosArrayPush(pTagList, &ppTag)) {
|
||||||
|
tTagFree(ppTag);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
pCreateReq->ctb.pTag = (uint8_t*)ppTag;
|
||||||
|
}
|
||||||
|
taosArrayDestroy(pTagVals);
|
||||||
}
|
}
|
||||||
RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
|
RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
|
||||||
|
|
||||||
|
@ -1251,6 +1283,7 @@ end:
|
||||||
destroyRequest(pRequest);
|
destroyRequest(pRequest);
|
||||||
tDecoderClear(&coder);
|
tDecoderClear(&coder);
|
||||||
qDestroyQuery(pQuery);
|
qDestroyQuery(pQuery);
|
||||||
|
taosArrayDestroyP(pTagList, taosMemoryFree);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -184,10 +184,16 @@ int32_t stmtBackupQueryFields(STscStmt* pStmt) {
|
||||||
|
|
||||||
int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD);
|
int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD);
|
||||||
pRes->fields = taosMemoryMalloc(size);
|
pRes->fields = taosMemoryMalloc(size);
|
||||||
pRes->userFields = taosMemoryMalloc(size);
|
if (pRes->fields == NULL) {
|
||||||
if (NULL == pRes->fields || NULL == pRes->userFields) {
|
|
||||||
STMT_ERR_RET(terrno);
|
STMT_ERR_RET(terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pRes->userFields = taosMemoryMalloc(size);
|
||||||
|
if (pRes->userFields == NULL) {
|
||||||
|
taosMemoryFreeClear(pRes->fields);
|
||||||
|
STMT_ERR_RET(terrno);
|
||||||
|
}
|
||||||
|
|
||||||
(void)memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
|
(void)memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
|
||||||
(void)memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
|
(void)memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
|
||||||
|
|
||||||
|
|
|
@ -1771,26 +1771,6 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tTagSetCid(const STag *pTag, int16_t iTag, int16_t cid) {
|
|
||||||
uint8_t *p = NULL;
|
|
||||||
int8_t isLarge = pTag->flags & TD_TAG_LARGE;
|
|
||||||
int16_t offset = 0;
|
|
||||||
|
|
||||||
if (isLarge) {
|
|
||||||
p = (uint8_t *)&((int16_t *)pTag->idx)[pTag->nTag];
|
|
||||||
} else {
|
|
||||||
p = (uint8_t *)&pTag->idx[pTag->nTag];
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isLarge) {
|
|
||||||
offset = ((int16_t *)pTag->idx)[iTag];
|
|
||||||
} else {
|
|
||||||
offset = pTag->idx[iTag];
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t nt = tPutI16v(p + offset, cid);
|
|
||||||
}
|
|
||||||
|
|
||||||
// STSchema ========================================
|
// STSchema ========================================
|
||||||
STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version) {
|
STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version) {
|
||||||
STSchema *pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn) * numOfCols);
|
STSchema *pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn) * numOfCols);
|
||||||
|
|
|
@ -297,11 +297,22 @@ static void *vmOpenVnodeInThread(void *param) {
|
||||||
SVnodeMgmt *pMgmt = pThread->pMgmt;
|
SVnodeMgmt *pMgmt = pThread->pMgmt;
|
||||||
char path[TSDB_FILENAME_LEN];
|
char path[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
dInfo("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
|
dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum);
|
||||||
setThreadName("open-vnodes");
|
setThreadName("open-vnodes");
|
||||||
|
|
||||||
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
|
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
|
||||||
SWrapperCfg *pCfg = &pThread->pCfgs[v];
|
SWrapperCfg *pCfg = &pThread->pCfgs[v];
|
||||||
|
if (pCfg->dropped) {
|
||||||
|
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
|
||||||
|
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId,
|
||||||
|
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
|
||||||
|
tmsgReportStartup("vnode-destroy", stepDesc);
|
||||||
|
|
||||||
|
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
|
||||||
|
vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0);
|
||||||
|
pThread->updateVnodesList = true;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
|
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
|
||||||
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
|
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
|
||||||
|
|
|
@ -1012,10 +1012,10 @@ _OVER:
|
||||||
|
|
||||||
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||||
mndTransSetDbName(pTrans, pDb->name, pStb->name);
|
mndTransSetDbName(pTrans, pDb->name, pStb->name);
|
||||||
TAOS_CHECK_RETURN (mndTransCheckConflict(pMnode, pTrans));
|
TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans));
|
||||||
TAOS_CHECK_RETURN (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb));
|
TAOS_CHECK_RETURN(mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb));
|
||||||
TAOS_CHECK_RETURN (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb));
|
TAOS_CHECK_RETURN(mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb));
|
||||||
TAOS_CHECK_RETURN (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb));
|
TAOS_CHECK_RETURN(mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1051,7 +1051,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS, .pCont = pHead, .contLen = contLen, .info = pReq->info};
|
.msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS, .pCont = pHead, .contLen = contLen, .info = pReq->info};
|
||||||
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
code = tmsgSendReq(&epSet, &rpcMsg);
|
code = tmsgSendReq(&epSet, &rpcMsg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code);
|
mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code);
|
||||||
|
@ -1500,8 +1500,8 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
|
||||||
|
|
||||||
static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
|
static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
SMqTopicObj *pTopic = NULL;
|
SMqTopicObj *pTopic = NULL;
|
||||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||||
|
@ -1562,8 +1562,8 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
|
||||||
|
|
||||||
static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
|
static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||||
|
@ -1616,8 +1616,8 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName
|
||||||
|
|
||||||
static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
|
static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
SSmaObj *pSma = NULL;
|
SSmaObj *pSma = NULL;
|
||||||
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||||
|
@ -2233,7 +2233,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
|
||||||
|
|
||||||
static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion *pStbVer, bool *schema, bool *sma) {
|
static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion *pStbVer, bool *schema, bool *sma) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
snprintf(tbFName, sizeof(tbFName), "%s.%s", pStbVer->dbFName, pStbVer->stbName);
|
snprintf(tbFName, sizeof(tbFName), "%s.%s", pStbVer->dbFName, pStbVer->stbName);
|
||||||
|
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, pStbVer->dbFName);
|
SDbObj *pDb = mndAcquireDb(pMnode, pStbVer->dbFName);
|
||||||
|
@ -2278,7 +2278,7 @@ static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion *pStbVer, bo
|
||||||
|
|
||||||
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
|
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
|
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
|
||||||
|
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
|
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
|
||||||
|
@ -2302,7 +2302,7 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char
|
||||||
|
|
||||||
static int32_t mndBuildStbCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) {
|
static int32_t mndBuildStbCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
|
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
|
||||||
|
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
|
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
|
||||||
|
@ -2656,7 +2656,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
|
||||||
code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb);
|
code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb);
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
if ((ret = tNameFromString(&name, alterReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
|
if ((ret = tNameFromString(&name, alterReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
|
||||||
mError("stb:%s, failed to tNameFromString since %s", alterReq.name, tstrerror(ret));
|
mError("stb:%s, failed to tNameFromString since %s", alterReq.name, tstrerror(ret));
|
||||||
|
@ -2779,8 +2779,8 @@ _OVER:
|
||||||
|
|
||||||
static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid) {
|
static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
SMqTopicObj *pTopic = NULL;
|
SMqTopicObj *pTopic = NULL;
|
||||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||||
|
@ -2839,8 +2839,8 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
|
||||||
|
|
||||||
static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, int64_t suid) {
|
static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, int64_t suid) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||||
|
@ -2945,7 +2945,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
|
||||||
code = mndDropStb(pMnode, pReq, pDb, pStb);
|
code = mndDropStb(pMnode, pReq, pDb, pStb);
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
if ((ret = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
|
if ((ret = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
|
||||||
mError("stb:%s, failed to tNameFromString since %s", dropReq.name, tstrerror(ret));
|
mError("stb:%s, failed to tNameFromString since %s", dropReq.name, tstrerror(ret));
|
||||||
|
@ -3016,7 +3016,7 @@ _OVER:
|
||||||
|
|
||||||
mndReleaseUser(pMnode, pUser);
|
mndReleaseUser(pMnode, pUser);
|
||||||
tFreeSTableMetaRsp(&metaRsp);
|
tFreeSTableMetaRsp(&metaRsp);
|
||||||
//TODO change to TAOS_RETURN
|
// TODO change to TAOS_RETURN
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3562,7 +3562,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
|
|
||||||
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
|
mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
|
||||||
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
|
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
|
||||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
@ -4259,7 +4259,9 @@ static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq) {
|
||||||
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
|
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
|
||||||
if (code) goto _OVER;
|
if (code) goto _OVER;
|
||||||
}
|
}
|
||||||
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) code = 0;
|
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) {
|
||||||
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
}
|
||||||
_OVER:
|
_OVER:
|
||||||
tFreeSMDropTbsReq(&dropReq);
|
tFreeSMDropTbsReq(&dropReq);
|
||||||
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
|
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
|
||||||
|
@ -4458,7 +4460,7 @@ static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
|
||||||
|
|
||||||
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
|
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
|
||||||
if (code) goto _end;
|
if (code) goto _end;
|
||||||
if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = 0;
|
if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
_end:
|
_end:
|
||||||
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
|
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
|
@ -324,7 +324,9 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
|
||||||
|
|
||||||
size_t len = 0;
|
size_t len = 0;
|
||||||
void *pKey = taosHashGetKey(pDb, &len);
|
void *pKey = taosHashGetKey(pDb, &len);
|
||||||
tstrncpy(p, pKey, 128);
|
int cpLen = (127 < len) ? 127 : len;
|
||||||
|
TAOS_STRNCPY(p, pKey, cpLen);
|
||||||
|
p[cpLen] = '\0';
|
||||||
|
|
||||||
int32_t code = doKillCheckpointTrans(pMnode, pKey, len);
|
int32_t code = doKillCheckpointTrans(pMnode, pKey, len);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
|
@ -39,6 +39,100 @@ static void metaDestroyLock(SMeta *pMeta) { (void)taosThreadRwlockDestroy(&pMeta
|
||||||
|
|
||||||
static void metaCleanup(SMeta **ppMeta);
|
static void metaCleanup(SMeta **ppMeta);
|
||||||
|
|
||||||
|
static void doScan(SMeta *pMeta) {
|
||||||
|
TBC *cursor = NULL;
|
||||||
|
int32_t code;
|
||||||
|
|
||||||
|
// open file to write
|
||||||
|
char path[TSDB_FILENAME_LEN] = {0};
|
||||||
|
snprintf(path, TSDB_FILENAME_LEN - 1, "%s%s", pMeta->path, TD_DIRSEP "scan.txt");
|
||||||
|
TdFilePtr fp = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||||
|
if (fp == NULL) {
|
||||||
|
metaError("failed to open file:%s, reason:%s", path, tstrerror(terrno));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tdbTbcOpen(pMeta->pTbDb, &cursor, NULL);
|
||||||
|
if (code) {
|
||||||
|
if (taosCloseFile(&fp) != 0) {
|
||||||
|
metaError("failed to close file:%s, reason:%s", path, tstrerror(terrno));
|
||||||
|
}
|
||||||
|
metaError("failed to open table.db cursor, reason:%s", tstrerror(terrno));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tdbTbcMoveToFirst(cursor);
|
||||||
|
if (code) {
|
||||||
|
if (taosCloseFile(&fp) != 0) {
|
||||||
|
metaError("failed to close file:%s, reason:%s", path, tstrerror(terrno));
|
||||||
|
}
|
||||||
|
tdbTbcClose(cursor);
|
||||||
|
metaError("failed to move to first, reason:%s", tstrerror(terrno));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
const void *pKey;
|
||||||
|
int kLen;
|
||||||
|
const void *pVal;
|
||||||
|
int vLen;
|
||||||
|
if (tdbTbcGet(cursor, &pKey, &kLen, &pVal, &vLen) < 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// decode entry
|
||||||
|
SDecoder dc = {0};
|
||||||
|
SMetaEntry me = {0};
|
||||||
|
|
||||||
|
tDecoderInit(&dc, (uint8_t *)pVal, vLen);
|
||||||
|
|
||||||
|
if (metaDecodeEntry(&dc, &me) < 0) {
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// skip deleted entry
|
||||||
|
if (tdbTbGet(pMeta->pUidIdx, &me.uid, sizeof(me.uid), NULL, NULL) == 0) {
|
||||||
|
// print entry
|
||||||
|
char buf[1024] = {0};
|
||||||
|
if (me.type == TSDB_SUPER_TABLE) {
|
||||||
|
snprintf(buf, sizeof(buf) - 1, "type: super table, version:%" PRId64 " uid: %" PRId64 " name: %s\n", me.version,
|
||||||
|
me.uid, me.name);
|
||||||
|
|
||||||
|
} else if (me.type == TSDB_CHILD_TABLE) {
|
||||||
|
snprintf(buf, sizeof(buf) - 1,
|
||||||
|
"type: child table, version:%" PRId64 " uid: %" PRId64 " name: %s suid:%" PRId64 "\n", me.version,
|
||||||
|
me.uid, me.name, me.ctbEntry.suid);
|
||||||
|
} else {
|
||||||
|
snprintf(buf, sizeof(buf) - 1, "type: normal table, version:%" PRId64 " uid: %" PRId64 " name: %s\n",
|
||||||
|
me.version, me.uid, me.name);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosWriteFile(fp, buf, strlen(buf)) < 0) {
|
||||||
|
metaError("failed to write file:%s, reason:%s", path, tstrerror(terrno));
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
|
||||||
|
if (tdbTbcMoveToNext(cursor) < 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tdbTbcClose(cursor);
|
||||||
|
|
||||||
|
// close file
|
||||||
|
if (taosFsyncFile(fp) < 0) {
|
||||||
|
metaError("failed to fsync file:%s, reason:%s", path, tstrerror(terrno));
|
||||||
|
}
|
||||||
|
if (taosCloseFile(&fp) < 0) {
|
||||||
|
metaError("failed to close file:%s, reason:%s", path, tstrerror(terrno));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
||||||
SMeta *pMeta = NULL;
|
SMeta *pMeta = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -134,6 +228,11 @@ int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
||||||
code = metaInitTbFilterCache(pMeta);
|
code = metaInitTbFilterCache(pMeta);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
// Do NOT remove this code, it is used to do debug stuff
|
||||||
|
doScan(pMeta);
|
||||||
|
#endif
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
metaError("vgId:%d %s failed at %s:%d since %s", TD_VID(pVnode), __func__, __FILE__, __LINE__, tstrerror(code));
|
metaError("vgId:%d %s failed at %s:%d since %s", TD_VID(pVnode), __func__, __FILE__, __LINE__, tstrerror(code));
|
||||||
|
|
|
@ -1583,7 +1583,7 @@ int32_t ctgDropTSMAForTbEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
|
||||||
SCtgTSMACache *pCtgCache = NULL;
|
SCtgTSMACache *pCtgCache = NULL;
|
||||||
(void)tNameGetFullDbName(pName, dbFName);
|
(void)tNameGetFullDbName(pName, dbFName);
|
||||||
|
|
||||||
CTG_ERR_JRET(ctgGetDBCache(pCtg, dbFName, &pDbCache));
|
CTG_ERR_JRET(ctgAcquireDBCache(pCtg, dbFName, &pDbCache));
|
||||||
if (NULL == pDbCache || !pDbCache->tsmaCache) {
|
if (NULL == pDbCache || !pDbCache->tsmaCache) {
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
@ -1613,6 +1613,7 @@ int32_t ctgDropTSMAForTbEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
|
||||||
|
|
||||||
CTG_ERR_JRET(ctgEnqueue(pCtg, pOp));
|
CTG_ERR_JRET(ctgEnqueue(pCtg, pOp));
|
||||||
taosHashRelease(pDbCache->tsmaCache, pCtgCache);
|
taosHashRelease(pDbCache->tsmaCache, pCtgCache);
|
||||||
|
ctgReleaseDBCache(pCtg, pDbCache);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -1621,6 +1622,9 @@ _return:
|
||||||
if (pCtgCache) {
|
if (pCtgCache) {
|
||||||
taosHashRelease(pDbCache->tsmaCache, pCtgCache);
|
taosHashRelease(pDbCache->tsmaCache, pCtgCache);
|
||||||
}
|
}
|
||||||
|
if (pDbCache) {
|
||||||
|
ctgReleaseDBCache(pCtg, pDbCache);
|
||||||
|
}
|
||||||
if (pOp) {
|
if (pOp) {
|
||||||
taosMemoryFree(pOp->data);
|
taosMemoryFree(pOp->data);
|
||||||
taosMemoryFree(pOp);
|
taosMemoryFree(pOp);
|
||||||
|
@ -3996,17 +4000,20 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
|
||||||
|
|
||||||
if (pCache->retryFetch || hasOutOfDateTSMACache(pCache->pTsmas)) {
|
if (pCache->retryFetch || hasOutOfDateTSMACache(pCache->pTsmas)) {
|
||||||
CTG_UNLOCK(CTG_READ, &pCache->tsmaLock);
|
CTG_UNLOCK(CTG_READ, &pCache->tsmaLock);
|
||||||
taosHashRelease(dbCache->tsmaCache, pCache);
|
|
||||||
|
|
||||||
ctgDebug("tsma for tb: %s.%s not in cache", tsmaSourceTbName.tname, dbFName);
|
ctgDebug("tsma for tb: %s.%s not in cache", tsmaSourceTbName.tname, dbFName);
|
||||||
|
|
||||||
CTG_ERR_JRET(ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TB_TSMA, &tsmaSourceTbName));
|
CTG_ERR_JRET(ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TB_TSMA, &tsmaSourceTbName));
|
||||||
if (NULL == taosArrayPush(pCtx->pResList, &(SMetaRes){0})) {
|
if (NULL == taosArrayPush(pCtx->pResList, &(SMetaRes){0})) {
|
||||||
|
taosHashRelease(dbCache->tsmaCache, pCache);
|
||||||
CTG_ERR_JRET(terrno);
|
CTG_ERR_JRET(terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
CTG_CACHE_NHIT_INC(CTG_CI_TBL_TSMA, 1);
|
CTG_CACHE_NHIT_INC(CTG_CI_TBL_TSMA, 1);
|
||||||
|
CTG_LOCK(CTG_WRITE, &pCache->tsmaLock);
|
||||||
pCache->retryFetch = false;
|
pCache->retryFetch = false;
|
||||||
|
CTG_UNLOCK(CTG_WRITE, &pCache->tsmaLock);
|
||||||
|
taosHashRelease(dbCache->tsmaCache, pCache);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -890,7 +890,7 @@ void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) {
|
||||||
} else {
|
} else {
|
||||||
int32_t code = taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId));
|
int32_t code = taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
qDebug("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4294,13 +4294,13 @@ _error:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr,
|
static int32_t doTagScanOneTable(SOperatorInfo* pOperator, SSDataBlock* pRes, SMetaReader* mr, SStorageAPI* pAPI) {
|
||||||
SStorageAPI* pAPI) {
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
STagScanInfo* pInfo = pOperator->info;
|
STagScanInfo* pInfo = pOperator->info;
|
||||||
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
||||||
|
int32_t count = pRes->info.rows;
|
||||||
|
|
||||||
STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
|
STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
|
||||||
if (!item) {
|
if (!item) {
|
||||||
|
@ -4360,6 +4360,8 @@ _end:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
|
} else {
|
||||||
|
pRes->info.rows++;
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -4715,26 +4717,23 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t count = 0;
|
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
|
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
|
||||||
|
pRes->info.rows = 0;
|
||||||
|
|
||||||
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
|
while (pInfo->curPos < size && pRes->info.rows < pOperator->resultInfo.capacity) {
|
||||||
code = doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
|
code = doTagScanOneTable(pOperator, pRes, &mr, &pTaskInfo->storageAPI);
|
||||||
if (code != TSDB_CODE_OUT_OF_MEMORY) {
|
if (code != TSDB_CODE_OUT_OF_MEMORY) {
|
||||||
// ignore other error
|
// ignore other error
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
++count;
|
|
||||||
if (++pInfo->curPos >= size) {
|
if (++pInfo->curPos >= size) {
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pRes->info.rows = count;
|
|
||||||
|
|
||||||
pAPI->metaReaderFn.clearReader(&mr);
|
pAPI->metaReaderFn.clearReader(&mr);
|
||||||
bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo);
|
bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo);
|
||||||
if (bLimitReached) {
|
if (bLimitReached) {
|
||||||
|
|
|
@ -131,9 +131,6 @@ const SSTabFltFuncDef filterDict[] = {
|
||||||
static int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta,
|
static int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta,
|
||||||
size_t size, const char* dbName, int64_t* pRows);
|
size_t size, const char* dbName, int64_t* pRows);
|
||||||
|
|
||||||
static char* SYSTABLE_IDX_COLUMN[] = {"table_name", "db_name", "create_time", "columns",
|
|
||||||
"ttl", "stable_name", "vgroup_id', 'uid", "type"};
|
|
||||||
|
|
||||||
static char* SYSTABLE_SPECIAL_COL[] = {"db_name", "vgroup_id"};
|
static char* SYSTABLE_SPECIAL_COL[] = {"db_name", "vgroup_id"};
|
||||||
|
|
||||||
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
|
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
|
||||||
|
@ -2828,12 +2825,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
|
|
||||||
SSDataBlock* pRes = NULL;
|
|
||||||
int32_t code = doBlockInfoScanNext(pOperator, &pRes);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void destroyBlockDistScanOperatorInfo(void* param) {
|
static void destroyBlockDistScanOperatorInfo(void* param) {
|
||||||
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
|
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
|
||||||
blockDataDestroy(pDistInfo->pResBlock);
|
blockDataDestroy(pDistInfo->pResBlock);
|
||||||
|
@ -2852,6 +2843,8 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
|
||||||
pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
|
pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
|
||||||
pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t));
|
pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t));
|
||||||
if (pCond->colList == NULL || pCond->pSlotList == NULL) {
|
if (pCond->colList == NULL || pCond->pSlotList == NULL) {
|
||||||
|
taosMemoryFree(pCond->colList);
|
||||||
|
taosMemoryFree(pCond->pSlotList);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -278,7 +278,7 @@ static bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t in
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
|
static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
|
||||||
SSDataBlock* pSrcBlock, int32_t index, bool beforeTs, SExecTaskInfo* pTaskInfo) {
|
SSDataBlock* pSrcBlock, int32_t index, bool beforeTs, SExecTaskInfo* pTaskInfo, bool genAfterBlock) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
int32_t rows = pResBlock->info.rows;
|
int32_t rows = pResBlock->info.rows;
|
||||||
|
@ -427,7 +427,7 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (start.key == INT64_MIN || end.key == INT64_MIN) {
|
if (start.key == INT64_MIN || end.key == INT64_MIN || genAfterBlock) {
|
||||||
colDataSetNULL(pDst, rows);
|
colDataSetNULL(pDst, rows);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -463,8 +463,13 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (genAfterBlock && rows == 0) {
|
||||||
|
hasInterp = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, srcSlot);
|
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, srcSlot);
|
||||||
if (pkey->isNull == false) {
|
if (pkey->isNull == false && !genAfterBlock) {
|
||||||
code = colDataSetVal(pDst, rows, pkey->pData, false);
|
code = colDataSetVal(pDst, rows, pkey->pData, false);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
} else {
|
} else {
|
||||||
|
@ -836,7 +841,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
||||||
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
|
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
|
||||||
if (nextTs > pSliceInfo->current) {
|
if (nextTs > pSliceInfo->current) {
|
||||||
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
|
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
|
||||||
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false, pTaskInfo) &&
|
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false, pTaskInfo, false) &&
|
||||||
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
|
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
@ -864,7 +869,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
||||||
doKeepLinearInfo(pSliceInfo, pBlock, i);
|
doKeepLinearInfo(pSliceInfo, pBlock, i);
|
||||||
|
|
||||||
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
|
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
|
||||||
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true, pTaskInfo) &&
|
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true, pTaskInfo, false) &&
|
||||||
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
|
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
@ -909,13 +914,12 @@ static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperato
|
||||||
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
||||||
SInterval* pInterval = &pSliceInfo->interval;
|
SInterval* pInterval = &pSliceInfo->interval;
|
||||||
|
|
||||||
if (pSliceInfo->fillType == TSDB_FILL_NEXT || pSliceInfo->fillType == TSDB_FILL_LINEAR ||
|
if (pSliceInfo->pPrevGroupKey == NULL) {
|
||||||
pSliceInfo->pPrevGroupKey == NULL) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (pSliceInfo->current <= pSliceInfo->win.ekey) {
|
while (pSliceInfo->current <= pSliceInfo->win.ekey) {
|
||||||
(void)genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false, pOperator->pTaskInfo);
|
(void)genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false, pOperator->pTaskInfo, true);
|
||||||
pSliceInfo->current =
|
pSliceInfo->current =
|
||||||
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,12 +28,12 @@
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define CHECK_OUT_OF_MEM(p) \
|
#define CHECK_OUT_OF_MEM(p) \
|
||||||
do { \
|
do { \
|
||||||
if (NULL == (p)) { \
|
if (NULL == (p)) { \
|
||||||
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; \
|
pCxt->errCode = terrno; \
|
||||||
goto _err; \
|
goto _err; \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define CHECK_PARSER_STATUS(pCxt) \
|
#define CHECK_PARSER_STATUS(pCxt) \
|
||||||
|
|
|
@ -47,6 +47,7 @@ int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) {
|
||||||
SAstCreateContext cxt;
|
SAstCreateContext cxt;
|
||||||
initAstCreateContext(pParseCxt, &cxt);
|
initAstCreateContext(pParseCxt, &cxt);
|
||||||
void* pParser = ParseAlloc((FMalloc)taosMemoryMalloc);
|
void* pParser = ParseAlloc((FMalloc)taosMemoryMalloc);
|
||||||
|
if (!pParser) return terrno;
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
SToken t0 = {0};
|
SToken t0 = {0};
|
||||||
|
|
|
@ -61,6 +61,7 @@ typedef enum {
|
||||||
#define SCH_MAX_TASK_TIMEOUT_USEC 300000000
|
#define SCH_MAX_TASK_TIMEOUT_USEC 300000000
|
||||||
#define SCH_DEFAULT_MAX_RETRY_NUM 6
|
#define SCH_DEFAULT_MAX_RETRY_NUM 6
|
||||||
#define SCH_MIN_AYSNC_EXEC_NUM 3
|
#define SCH_MIN_AYSNC_EXEC_NUM 3
|
||||||
|
#define SCH_DEFAULT_RETRY_TOTAL_ROUND 3
|
||||||
|
|
||||||
typedef struct SSchDebug {
|
typedef struct SSchDebug {
|
||||||
bool lockEnable;
|
bool lockEnable;
|
||||||
|
|
|
@ -366,9 +366,11 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet,
|
||||||
if (SCH_IS_DATA_BIND_TASK(pTask)) {
|
if (SCH_IS_DATA_BIND_TASK(pTask)) {
|
||||||
if (pEpSet) {
|
if (pEpSet) {
|
||||||
pCtx->roundTotal = pEpSet->numOfEps;
|
pCtx->roundTotal = pEpSet->numOfEps;
|
||||||
} else {
|
} else if (pTask->candidateAddrs && taosArrayGetSize(pTask->candidateAddrs) > 0) {
|
||||||
SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
|
SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
|
||||||
pCtx->roundTotal = pAddr->epSet.numOfEps;
|
pCtx->roundTotal = pAddr->epSet.numOfEps;
|
||||||
|
} else {
|
||||||
|
pCtx->roundTotal = SCH_DEFAULT_RETRY_TOTAL_ROUND;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pCtx->roundTotal = 1;
|
pCtx->roundTotal = 1;
|
||||||
|
|
|
@ -3660,6 +3660,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
|
||||||
pCur->db = wrapper->db;
|
pCur->db = wrapper->db;
|
||||||
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
|
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
|
||||||
(rocksdb_readoptions_t**)&pCur->readOpt);
|
(rocksdb_readoptions_t**)&pCur->readOpt);
|
||||||
|
if (pCur->iter == NULL) {
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
|
@ -3682,6 +3686,11 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
|
||||||
TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
|
TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
|
||||||
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
|
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
|
||||||
|
|
||||||
|
if (!rocksdb_iter_valid(pCur->iter)) {
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
rocksdb_iter_prev(pCur->iter);
|
rocksdb_iter_prev(pCur->iter);
|
||||||
if (!rocksdb_iter_valid(pCur->iter)) {
|
if (!rocksdb_iter_valid(pCur->iter)) {
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
|
|
|
@ -24,7 +24,8 @@
|
||||||
#define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec
|
#define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec
|
||||||
|
|
||||||
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
|
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
|
||||||
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks);
|
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
|
||||||
|
int32_t* totalBlocks);
|
||||||
|
|
||||||
bool streamTaskShouldStop(const SStreamTask* pTask) {
|
bool streamTaskShouldStop(const SStreamTask* pTask) {
|
||||||
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
||||||
|
@ -95,17 +96,53 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamDataBlock* pRetrieveBlock,
|
||||||
|
SArray* pRes) {
|
||||||
|
SSDataBlock block = {0};
|
||||||
|
int32_t num = taosArrayGetSize(pRetrieveBlock->blocks);
|
||||||
|
if (num != 1) {
|
||||||
|
stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num);
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* p = taosArrayGet(pRetrieveBlock->blocks, 0);
|
||||||
|
int32_t code = assignOneDataBlock(&block, p);
|
||||||
|
if (code) {
|
||||||
|
stError("s-task:%s failed to assign retrieve block, code:%s", pTask->id.idStr, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
block.info.type = STREAM_PULL_OVER;
|
||||||
|
block.info.childId = pTask->info.selfChildId;
|
||||||
|
|
||||||
|
p = taosArrayPush(pRes, &block);
|
||||||
|
if (p != NULL) {
|
||||||
|
(*pNumOfBlocks) += 1;
|
||||||
|
stDebug("s-task:%s(child %d) retrieve res from upstream completed, QID:0x%" PRIx64, pTask->id.idStr,
|
||||||
|
pTask->info.selfChildId, pRetrieveBlock->reqId);
|
||||||
|
} else {
|
||||||
|
code = terrno;
|
||||||
|
stError("s-task:%s failed to append pull over block for retrieve data, QID:0x%" PRIx64" code:%s", pTask->id.idStr,
|
||||||
|
pRetrieveBlock->reqId, tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
|
int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
void* pExecutor = pTask->exec.pExecutor;
|
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
int32_t numOfBlocks = 0;
|
int32_t numOfBlocks = 0;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
void* pExecutor = pTask->exec.pExecutor;
|
||||||
SArray* pRes = NULL;
|
SArray* pRes = NULL;
|
||||||
|
|
||||||
*totalBlocks = 0;
|
*totalBlocks = 0;
|
||||||
*totalSize = 0;
|
*totalSize = 0;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
SSDataBlock* output = NULL;
|
||||||
|
uint64_t ts = 0;
|
||||||
|
|
||||||
if (pRes == NULL) {
|
if (pRes == NULL) {
|
||||||
pRes = taosArrayInit(4, sizeof(SSDataBlock));
|
pRes = taosArrayInit(4, sizeof(SSDataBlock));
|
||||||
}
|
}
|
||||||
|
@ -115,8 +152,6 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* output = NULL;
|
|
||||||
uint64_t ts = 0;
|
|
||||||
if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
|
if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
|
||||||
if (code == TSDB_CODE_QRY_IN_EXEC) {
|
if (code == TSDB_CODE_QRY_IN_EXEC) {
|
||||||
resetTaskInfo(pExecutor);
|
resetTaskInfo(pExecutor);
|
||||||
|
@ -124,6 +159,7 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
|
||||||
|
|
||||||
if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) {
|
if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) {
|
||||||
stFatal("s-task:%s failed to continue execute since %s", pTask->id.idStr, tstrerror(code));
|
stFatal("s-task:%s failed to continue execute since %s", pTask->id.idStr, tstrerror(code));
|
||||||
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
return code;
|
return code;
|
||||||
} else {
|
} else {
|
||||||
qResetTaskCode(pExecutor);
|
qResetTaskCode(pExecutor);
|
||||||
|
@ -133,33 +169,11 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
|
||||||
|
|
||||||
if (output == NULL) {
|
if (output == NULL) {
|
||||||
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||||
SSDataBlock block = {0};
|
code = doAppendPullOverBlock(pTask, &numOfBlocks, (SStreamDataBlock*) pItem, pRes);
|
||||||
const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem;
|
if (code) {
|
||||||
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
int32_t num = taosArrayGetSize(pRetrieveBlock->blocks);
|
return code;
|
||||||
if (num != 1) {
|
}
|
||||||
stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
|
|
||||||
if (code) {
|
|
||||||
stError("s-task:%s failed to copy datablock, code:%s", pTask->id.idStr, tstrerror(code));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
block.info.type = STREAM_PULL_OVER;
|
|
||||||
block.info.childId = pTask->info.selfChildId;
|
|
||||||
|
|
||||||
void* p = taosArrayPush(pRes, &block);
|
|
||||||
if (p != NULL) {
|
|
||||||
numOfBlocks += 1;
|
|
||||||
} else {
|
|
||||||
stError("s-task:%s failed to add retrieve block", pTask->id.idStr);
|
|
||||||
}
|
|
||||||
|
|
||||||
stDebug("s-task:%s(child %d) retrieve process completed,QID:0x%" PRIx64 " dump results", pTask->id.idStr,
|
|
||||||
pTask->info.selfChildId, pRetrieveBlock->reqId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
@ -189,11 +203,11 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
|
||||||
void* p = taosArrayPush(pRes, &block);
|
void* p = taosArrayPush(pRes, &block);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
stError("s-task:%s failed to add computing results, the final res may be incorrect", pTask->id.idStr);
|
stError("s-task:%s failed to add computing results, the final res may be incorrect", pTask->id.idStr);
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
|
||||||
|
pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size));
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
|
|
||||||
pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size));
|
|
||||||
|
|
||||||
// current output should be dispatched to down stream nodes
|
// current output should be dispatched to down stream nodes
|
||||||
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
|
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
|
||||||
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
|
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
|
||||||
|
@ -303,7 +317,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
|
||||||
bool finished = false;
|
bool finished = false;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
if(pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
||||||
stError("s-task:%s not source scan-history task, not exec, quit", pTask->id.idStr);
|
stError("s-task:%s not source scan-history task, not exec, quit", pTask->id.idStr);
|
||||||
return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
|
return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
|
||||||
}
|
}
|
||||||
|
@ -408,7 +422,7 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (!(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING ||
|
if (!(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING ||
|
||||||
status == TASK_STATUS__STOP)) {
|
status == TASK_STATUS__STOP)) {
|
||||||
stError("s-task:%s invalid task status:%d", id, status);
|
stError("s-task:%s invalid task status:%d", id, status);
|
||||||
return TSDB_CODE_STREAM_INTERNAL_ERROR;
|
return TSDB_CODE_STREAM_INTERNAL_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -718,7 +732,7 @@ int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpoi
|
||||||
|
|
||||||
// 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V.
|
// 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V.
|
||||||
int32_t code = doStreamTaskExecImpl(pTask, pCheckpointBlock, 1);
|
int32_t code = doStreamTaskExecImpl(pTask, pCheckpointBlock, 1);
|
||||||
if(code) {
|
if (code) {
|
||||||
stError("s-task:%s failed to exec stream task before checkpoint, code:%s", id, tstrerror(code));
|
stError("s-task:%s failed to exec stream task before checkpoint, code:%s", id, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -833,7 +847,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
||||||
if (pState.state == TASK_STATUS__CK) {
|
if (pState.state == TASK_STATUS__CK) {
|
||||||
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
|
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
|
||||||
code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
|
code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
|
||||||
} else { // todo refactor
|
} else { // todo refactor
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
code = streamTaskSendCheckpointSourceRsp(pTask);
|
code = streamTaskSendCheckpointSourceRsp(pTask);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -99,8 +99,8 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId) {
|
SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||||
stDebug("open stream state %p, %s", pState, path);
|
stDebug("open stream state %p, %s", pState, path);
|
||||||
|
@ -170,12 +170,12 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void*
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
int32_t len = getRowStateRowSize(pState->pFileState);
|
int32_t len = getRowStateRowSize(pState->pFileState);
|
||||||
int32_t tmpLen = len;
|
int32_t tmpLen = len;
|
||||||
code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &tmpLen);
|
code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &tmpLen);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
||||||
int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
|
int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
|
||||||
memcpy(buf + len - rowSize, value, vLen);
|
memcpy(buf + len - rowSize, value, vLen);
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
|
@ -189,12 +189,12 @@ int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVa
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
int32_t len = getRowStateRowSize(pState->pFileState);
|
int32_t len = getRowStateRowSize(pState->pFileState);
|
||||||
int32_t tmpLen = len;
|
int32_t tmpLen = len;
|
||||||
code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &tmpLen);
|
code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &tmpLen);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
||||||
int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
|
int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
|
||||||
*ppVal = buf + len - rowSize;
|
*ppVal = buf + len - rowSize;
|
||||||
streamStateReleaseBuf(pState, pVal, false);
|
streamStateReleaseBuf(pState, pVal, false);
|
||||||
|
|
||||||
|
|
|
@ -937,6 +937,7 @@ static int walFindCurMetaVer(SWal* pWal) {
|
||||||
TdDirPtr pDir = taosOpenDir(pWal->path);
|
TdDirPtr pDir = taosOpenDir(pWal->path);
|
||||||
if (pDir == NULL) {
|
if (pDir == NULL) {
|
||||||
wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, tstrerror(terrno));
|
wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, tstrerror(terrno));
|
||||||
|
regfree(&walMetaRegexPattern);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -956,6 +957,7 @@ static int walFindCurMetaVer(SWal* pWal) {
|
||||||
}
|
}
|
||||||
if (taosCloseDir(&pDir) != 0) {
|
if (taosCloseDir(&pDir) != 0) {
|
||||||
wError("failed to close dir, ret:%s", tstrerror(terrno));
|
wError("failed to close dir, ret:%s", tstrerror(terrno));
|
||||||
|
regfree(&walMetaRegexPattern);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
regfree(&walMetaRegexPattern);
|
regfree(&walMetaRegexPattern);
|
||||||
|
|
|
@ -281,6 +281,7 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5466.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5473.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5473.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py
|
||||||
|
|
|
@ -907,7 +907,7 @@ class TDTestCase:
|
||||||
|
|
||||||
## {. . .}
|
## {. . .}
|
||||||
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(next)")
|
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(next)")
|
||||||
tdSql.checkRows(12)
|
tdSql.checkRows(13)
|
||||||
tdSql.checkData(0, 0, 5)
|
tdSql.checkData(0, 0, 5)
|
||||||
tdSql.checkData(1, 0, 5)
|
tdSql.checkData(1, 0, 5)
|
||||||
tdSql.checkData(2, 0, 10)
|
tdSql.checkData(2, 0, 10)
|
||||||
|
@ -920,6 +920,7 @@ class TDTestCase:
|
||||||
tdSql.checkData(9, 0, 15)
|
tdSql.checkData(9, 0, 15)
|
||||||
tdSql.checkData(10, 0, 15)
|
tdSql.checkData(10, 0, 15)
|
||||||
tdSql.checkData(11, 0, 15)
|
tdSql.checkData(11, 0, 15)
|
||||||
|
tdSql.checkData(12, 0, None)
|
||||||
|
|
||||||
## {} ...
|
## {} ...
|
||||||
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:01', '2020-02-01 00:00:04') every(1s) fill(next)")
|
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:01', '2020-02-01 00:00:04') every(1s) fill(next)")
|
||||||
|
@ -957,10 +958,12 @@ class TDTestCase:
|
||||||
|
|
||||||
## ..{.}
|
## ..{.}
|
||||||
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:13', '2020-02-01 00:00:17') every(1s) fill(next)")
|
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:13', '2020-02-01 00:00:17') every(1s) fill(next)")
|
||||||
tdSql.checkRows(3)
|
tdSql.checkRows(5)
|
||||||
tdSql.checkData(0, 0, 15)
|
tdSql.checkData(0, 0, 15)
|
||||||
tdSql.checkData(1, 0, 15)
|
tdSql.checkData(1, 0, 15)
|
||||||
tdSql.checkData(2, 0, 15)
|
tdSql.checkData(2, 0, 15)
|
||||||
|
tdSql.checkData(3, 0, None)
|
||||||
|
tdSql.checkData(4, 0, None)
|
||||||
|
|
||||||
## ... {}
|
## ... {}
|
||||||
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(next)")
|
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(next)")
|
||||||
|
@ -1272,7 +1275,7 @@ class TDTestCase:
|
||||||
tdSql.checkData(8, 1, True)
|
tdSql.checkData(8, 1, True)
|
||||||
|
|
||||||
tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(next)")
|
tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(next)")
|
||||||
tdSql.checkRows(12)
|
tdSql.checkRows(13)
|
||||||
tdSql.checkCols(3)
|
tdSql.checkCols(3)
|
||||||
|
|
||||||
tdSql.checkData(0, 0, '2020-02-01 00:00:04.000')
|
tdSql.checkData(0, 0, '2020-02-01 00:00:04.000')
|
||||||
|
@ -1287,6 +1290,7 @@ class TDTestCase:
|
||||||
tdSql.checkData(9, 0, '2020-02-01 00:00:13.000')
|
tdSql.checkData(9, 0, '2020-02-01 00:00:13.000')
|
||||||
tdSql.checkData(10, 0, '2020-02-01 00:00:14.000')
|
tdSql.checkData(10, 0, '2020-02-01 00:00:14.000')
|
||||||
tdSql.checkData(11, 0, '2020-02-01 00:00:15.000')
|
tdSql.checkData(11, 0, '2020-02-01 00:00:15.000')
|
||||||
|
tdSql.checkData(12, 0, '2020-02-01 00:00:16.000')
|
||||||
|
|
||||||
tdSql.checkData(0, 1, True)
|
tdSql.checkData(0, 1, True)
|
||||||
tdSql.checkData(1, 1, False)
|
tdSql.checkData(1, 1, False)
|
||||||
|
@ -1300,6 +1304,7 @@ class TDTestCase:
|
||||||
tdSql.checkData(9, 1, True)
|
tdSql.checkData(9, 1, True)
|
||||||
tdSql.checkData(10, 1, True)
|
tdSql.checkData(10, 1, True)
|
||||||
tdSql.checkData(11, 1, False)
|
tdSql.checkData(11, 1, False)
|
||||||
|
tdSql.checkData(12, 1, True)
|
||||||
|
|
||||||
tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-01 00:00:15') every(2s) fill(next)")
|
tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-01 00:00:15') every(2s) fill(next)")
|
||||||
tdSql.checkRows(6)
|
tdSql.checkRows(6)
|
||||||
|
@ -1677,9 +1682,13 @@ class TDTestCase:
|
||||||
|
|
||||||
## | . | { | .} |
|
## | . | { | .} |
|
||||||
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(next)")
|
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(next)")
|
||||||
tdSql.checkRows(2)
|
tdSql.checkRows(6)
|
||||||
tdSql.checkData(0, 0, 15)
|
tdSql.checkData(0, 0, 15)
|
||||||
tdSql.checkData(1, 0, 15)
|
tdSql.checkData(1, 0, 15)
|
||||||
|
tdSql.checkData(2, 0, None)
|
||||||
|
tdSql.checkData(3, 0, None)
|
||||||
|
tdSql.checkData(4, 0, None)
|
||||||
|
tdSql.checkData(5, 0, None)
|
||||||
|
|
||||||
# test fill linear
|
# test fill linear
|
||||||
|
|
||||||
|
@ -2732,7 +2741,7 @@ class TDTestCase:
|
||||||
tdSql.checkData(4, i, 15)
|
tdSql.checkData(4, i, 15)
|
||||||
|
|
||||||
tdSql.query(f"select interp(c0),interp(c1),interp(c2),interp(c3) from {dbname}.{tbname} range('2020-02-09 00:00:05', '2020-02-13 00:00:05') every(1d) fill(next)")
|
tdSql.query(f"select interp(c0),interp(c1),interp(c2),interp(c3) from {dbname}.{tbname} range('2020-02-09 00:00:05', '2020-02-13 00:00:05') every(1d) fill(next)")
|
||||||
tdSql.checkRows(3)
|
tdSql.checkRows(5)
|
||||||
tdSql.checkCols(4)
|
tdSql.checkCols(4)
|
||||||
|
|
||||||
for i in range (tdSql.queryCols):
|
for i in range (tdSql.queryCols):
|
||||||
|
@ -2828,7 +2837,7 @@ class TDTestCase:
|
||||||
|
|
||||||
# test fill next
|
# test fill next
|
||||||
tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(next)")
|
tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(next)")
|
||||||
tdSql.checkRows(18)
|
tdSql.checkRows(19)
|
||||||
tdSql.checkCols(3)
|
tdSql.checkCols(3)
|
||||||
|
|
||||||
tdSql.checkData(0, 0, '2020-02-02 00:00:00.000')
|
tdSql.checkData(0, 0, '2020-02-02 00:00:00.000')
|
||||||
|
@ -2851,6 +2860,7 @@ class TDTestCase:
|
||||||
tdSql.checkData(15, 2, None)
|
tdSql.checkData(15, 2, None)
|
||||||
tdSql.checkData(16, 2, None)
|
tdSql.checkData(16, 2, None)
|
||||||
tdSql.checkData(17, 2, None)
|
tdSql.checkData(17, 2, None)
|
||||||
|
tdSql.checkData(18, 2, None)
|
||||||
|
|
||||||
tdSql.checkData(17, 0, '2020-02-02 00:00:17.000')
|
tdSql.checkData(17, 0, '2020-02-02 00:00:17.000')
|
||||||
|
|
||||||
|
@ -3081,7 +3091,7 @@ class TDTestCase:
|
||||||
|
|
||||||
# test fill linear
|
# test fill linear
|
||||||
tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(linear)")
|
tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(linear)")
|
||||||
tdSql.checkRows(17)
|
tdSql.checkRows(18)
|
||||||
tdSql.checkCols(3)
|
tdSql.checkCols(3)
|
||||||
|
|
||||||
tdSql.checkData(0, 0, '2020-02-02 00:00:01.000')
|
tdSql.checkData(0, 0, '2020-02-02 00:00:01.000')
|
||||||
|
@ -3103,8 +3113,9 @@ class TDTestCase:
|
||||||
tdSql.checkData(14, 2, None)
|
tdSql.checkData(14, 2, None)
|
||||||
tdSql.checkData(15, 2, None)
|
tdSql.checkData(15, 2, None)
|
||||||
tdSql.checkData(16, 2, None)
|
tdSql.checkData(16, 2, None)
|
||||||
|
tdSql.checkData(17, 2, None)
|
||||||
|
|
||||||
tdSql.checkData(16, 0, '2020-02-02 00:00:17.000')
|
tdSql.checkData(17, 0, '2020-02-02 00:00:18.000')
|
||||||
|
|
||||||
tdLog.printNoPrefix("==========step13:test error cases")
|
tdLog.printNoPrefix("==========step13:test error cases")
|
||||||
|
|
||||||
|
@ -3220,7 +3231,7 @@ class TDTestCase:
|
||||||
tdSql.checkData(17, 1, True)
|
tdSql.checkData(17, 1, True)
|
||||||
|
|
||||||
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
|
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
|
||||||
tdSql.checkRows(18)
|
tdSql.checkRows(19)
|
||||||
|
|
||||||
tdSql.checkData(0, 0, '2020-02-01 00:00:00.000')
|
tdSql.checkData(0, 0, '2020-02-01 00:00:00.000')
|
||||||
tdSql.checkData(0, 1, True)
|
tdSql.checkData(0, 1, True)
|
||||||
|
@ -3243,9 +3254,12 @@ class TDTestCase:
|
||||||
tdSql.checkData(15, 2, 15)
|
tdSql.checkData(15, 2, 15)
|
||||||
tdSql.checkData(16, 2, 17)
|
tdSql.checkData(16, 2, 17)
|
||||||
tdSql.checkData(17, 2, 17)
|
tdSql.checkData(17, 2, 17)
|
||||||
|
tdSql.checkData(18, 2, None)
|
||||||
|
|
||||||
tdSql.checkData(17, 0, '2020-02-01 00:00:17.000')
|
tdSql.checkData(17, 0, '2020-02-01 00:00:17.000')
|
||||||
tdSql.checkData(17, 1, False)
|
tdSql.checkData(17, 1, False)
|
||||||
|
tdSql.checkData(18, 0, '2020-02-01 00:00:18.000')
|
||||||
|
tdSql.checkData(18, 1, True)
|
||||||
|
|
||||||
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
|
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
|
||||||
tdSql.checkRows(17)
|
tdSql.checkRows(17)
|
||||||
|
@ -3362,24 +3376,24 @@ class TDTestCase:
|
||||||
|
|
||||||
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
|
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
|
||||||
|
|
||||||
tdSql.checkRows(48)
|
tdSql.checkRows(57)
|
||||||
for i in range(0, 14):
|
for i in range(0, 19):
|
||||||
tdSql.checkData(i, 0, 'ctb1')
|
tdSql.checkData(i, 0, 'ctb1')
|
||||||
|
|
||||||
for i in range(14, 30):
|
for i in range(19, 38):
|
||||||
tdSql.checkData(i, 0, 'ctb2')
|
tdSql.checkData(i, 0, 'ctb2')
|
||||||
|
|
||||||
for i in range(30, 48):
|
for i in range(38, 57):
|
||||||
tdSql.checkData(i, 0, 'ctb3')
|
tdSql.checkData(i, 0, 'ctb3')
|
||||||
|
|
||||||
tdSql.checkData(0, 1, '2020-02-01 00:00:00.000')
|
tdSql.checkData(0, 1, '2020-02-01 00:00:00.000')
|
||||||
tdSql.checkData(13, 1, '2020-02-01 00:00:13.000')
|
tdSql.checkData(18, 1, '2020-02-01 00:00:18.000')
|
||||||
|
|
||||||
tdSql.checkData(14, 1, '2020-02-01 00:00:00.000')
|
tdSql.checkData(19, 1, '2020-02-01 00:00:00.000')
|
||||||
tdSql.checkData(29, 1, '2020-02-01 00:00:15.000')
|
tdSql.checkData(37, 1, '2020-02-01 00:00:18.000')
|
||||||
|
|
||||||
tdSql.checkData(30, 1, '2020-02-01 00:00:00.000')
|
tdSql.checkData(38, 1, '2020-02-01 00:00:00.000')
|
||||||
tdSql.checkData(47, 1, '2020-02-01 00:00:17.000')
|
tdSql.checkData(56, 1, '2020-02-01 00:00:18.000')
|
||||||
|
|
||||||
for i in range(0, 2):
|
for i in range(0, 2):
|
||||||
tdSql.checkData(i, 3, 1)
|
tdSql.checkData(i, 3, 1)
|
||||||
|
@ -3390,24 +3404,33 @@ class TDTestCase:
|
||||||
for i in range(8, 14):
|
for i in range(8, 14):
|
||||||
tdSql.checkData(i, 3, 13)
|
tdSql.checkData(i, 3, 13)
|
||||||
|
|
||||||
for i in range(14, 18):
|
for i in range(14, 19):
|
||||||
|
tdSql.checkData(i, 3, None)
|
||||||
|
|
||||||
|
for i in range(19, 23):
|
||||||
tdSql.checkData(i, 3, 3)
|
tdSql.checkData(i, 3, 3)
|
||||||
|
|
||||||
for i in range(18, 24):
|
for i in range(23, 29):
|
||||||
tdSql.checkData(i, 3, 9)
|
tdSql.checkData(i, 3, 9)
|
||||||
|
|
||||||
for i in range(24, 30):
|
for i in range(29, 35):
|
||||||
tdSql.checkData(i, 3, 15)
|
tdSql.checkData(i, 3, 15)
|
||||||
|
|
||||||
for i in range(30, 36):
|
for i in range(35, 38):
|
||||||
|
tdSql.checkData(i, 3, None)
|
||||||
|
|
||||||
|
for i in range(38, 44):
|
||||||
tdSql.checkData(i, 3, 5)
|
tdSql.checkData(i, 3, 5)
|
||||||
|
|
||||||
for i in range(36, 42):
|
for i in range(44, 50):
|
||||||
tdSql.checkData(i, 3, 11)
|
tdSql.checkData(i, 3, 11)
|
||||||
|
|
||||||
for i in range(42, 48):
|
for i in range(50, 56):
|
||||||
tdSql.checkData(i, 3, 17)
|
tdSql.checkData(i, 3, 17)
|
||||||
|
|
||||||
|
for i in range(56, 57):
|
||||||
|
tdSql.checkData(i, 3, None)
|
||||||
|
|
||||||
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
|
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
|
||||||
|
|
||||||
tdSql.checkRows(39)
|
tdSql.checkRows(39)
|
||||||
|
@ -3450,7 +3473,7 @@ class TDTestCase:
|
||||||
tdSql.checkRows(90)
|
tdSql.checkRows(90)
|
||||||
|
|
||||||
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
|
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
|
||||||
tdSql.checkRows(90)
|
tdSql.checkRows(171)
|
||||||
|
|
||||||
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
|
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
|
||||||
tdSql.checkRows(9)
|
tdSql.checkRows(9)
|
||||||
|
@ -3467,7 +3490,7 @@ class TDTestCase:
|
||||||
tdSql.checkRows(48)
|
tdSql.checkRows(48)
|
||||||
|
|
||||||
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
|
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
|
||||||
tdSql.checkRows(48)
|
tdSql.checkRows(57)
|
||||||
|
|
||||||
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
|
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
|
||||||
tdSql.checkRows(39)
|
tdSql.checkRows(39)
|
||||||
|
@ -4363,7 +4386,7 @@ class TDTestCase:
|
||||||
|
|
||||||
tdSql.query(f"select _irowts, _isfilled, interp(c0, 1) from {dbname}.{tbname_null} range('2020-02-02 00:00:01', '2020-02-02 00:00:11') every(1s) fill(next)")
|
tdSql.query(f"select _irowts, _isfilled, interp(c0, 1) from {dbname}.{tbname_null} range('2020-02-02 00:00:01', '2020-02-02 00:00:11') every(1s) fill(next)")
|
||||||
|
|
||||||
tdSql.checkRows(9)
|
tdSql.checkRows(11)
|
||||||
tdSql.checkData(0, 1, False)
|
tdSql.checkData(0, 1, False)
|
||||||
tdSql.checkData(1, 1, True)
|
tdSql.checkData(1, 1, True)
|
||||||
tdSql.checkData(2, 1, False)
|
tdSql.checkData(2, 1, False)
|
||||||
|
@ -4373,6 +4396,8 @@ class TDTestCase:
|
||||||
tdSql.checkData(6, 1, True)
|
tdSql.checkData(6, 1, True)
|
||||||
tdSql.checkData(7, 1, False)
|
tdSql.checkData(7, 1, False)
|
||||||
tdSql.checkData(8, 1, False)
|
tdSql.checkData(8, 1, False)
|
||||||
|
tdSql.checkData(9, 1, True)
|
||||||
|
tdSql.checkData(10, 1, True)
|
||||||
|
|
||||||
tdSql.checkData(0, 2, 1)
|
tdSql.checkData(0, 2, 1)
|
||||||
tdSql.checkData(1, 2, 3)
|
tdSql.checkData(1, 2, 3)
|
||||||
|
@ -4383,11 +4408,13 @@ class TDTestCase:
|
||||||
tdSql.checkData(6, 2, 8)
|
tdSql.checkData(6, 2, 8)
|
||||||
tdSql.checkData(7, 2, 8)
|
tdSql.checkData(7, 2, 8)
|
||||||
tdSql.checkData(8, 2, 9)
|
tdSql.checkData(8, 2, 9)
|
||||||
|
tdSql.checkData(9, 2, None)
|
||||||
|
tdSql.checkData(10, 2, None)
|
||||||
|
|
||||||
|
|
||||||
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{tbname_null} where c0 is not null range('2020-02-02 00:00:01', '2020-02-02 00:00:11') every(1s) fill(next)")
|
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{tbname_null} where c0 is not null range('2020-02-02 00:00:01', '2020-02-02 00:00:11') every(1s) fill(next)")
|
||||||
|
|
||||||
tdSql.checkRows(9)
|
tdSql.checkRows(11)
|
||||||
tdSql.checkData(0, 1, False)
|
tdSql.checkData(0, 1, False)
|
||||||
tdSql.checkData(1, 1, True)
|
tdSql.checkData(1, 1, True)
|
||||||
tdSql.checkData(2, 1, False)
|
tdSql.checkData(2, 1, False)
|
||||||
|
@ -4397,6 +4424,9 @@ class TDTestCase:
|
||||||
tdSql.checkData(6, 1, True)
|
tdSql.checkData(6, 1, True)
|
||||||
tdSql.checkData(7, 1, False)
|
tdSql.checkData(7, 1, False)
|
||||||
tdSql.checkData(8, 1, False)
|
tdSql.checkData(8, 1, False)
|
||||||
|
tdSql.checkData(9, 1, True)
|
||||||
|
tdSql.checkData(10, 1, True)
|
||||||
|
|
||||||
|
|
||||||
tdSql.checkData(0, 2, 1)
|
tdSql.checkData(0, 2, 1)
|
||||||
tdSql.checkData(1, 2, 3)
|
tdSql.checkData(1, 2, 3)
|
||||||
|
@ -4407,6 +4437,8 @@ class TDTestCase:
|
||||||
tdSql.checkData(6, 2, 8)
|
tdSql.checkData(6, 2, 8)
|
||||||
tdSql.checkData(7, 2, 8)
|
tdSql.checkData(7, 2, 8)
|
||||||
tdSql.checkData(8, 2, 9)
|
tdSql.checkData(8, 2, 9)
|
||||||
|
tdSql.checkData(9, 2, None)
|
||||||
|
tdSql.checkData(10, 2, None)
|
||||||
|
|
||||||
# super table
|
# super table
|
||||||
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname_null} range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
|
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname_null} range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
|
||||||
|
@ -4443,7 +4475,7 @@ class TDTestCase:
|
||||||
|
|
||||||
tdSql.query(f"select _irowts, _isfilled, interp(c0, 1) from {dbname}.{stbname_null} range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
|
tdSql.query(f"select _irowts, _isfilled, interp(c0, 1) from {dbname}.{stbname_null} range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
|
||||||
|
|
||||||
tdSql.checkRows(8)
|
tdSql.checkRows(9)
|
||||||
tdSql.checkData(0, 1, False)
|
tdSql.checkData(0, 1, False)
|
||||||
tdSql.checkData(1, 1, True)
|
tdSql.checkData(1, 1, True)
|
||||||
tdSql.checkData(2, 1, True)
|
tdSql.checkData(2, 1, True)
|
||||||
|
@ -4452,6 +4484,7 @@ class TDTestCase:
|
||||||
tdSql.checkData(5, 1, True)
|
tdSql.checkData(5, 1, True)
|
||||||
tdSql.checkData(6, 1, False)
|
tdSql.checkData(6, 1, False)
|
||||||
tdSql.checkData(7, 1, False)
|
tdSql.checkData(7, 1, False)
|
||||||
|
tdSql.checkData(8, 1, True)
|
||||||
|
|
||||||
tdSql.checkData(0, 2, 1)
|
tdSql.checkData(0, 2, 1)
|
||||||
tdSql.checkData(1, 2, 9)
|
tdSql.checkData(1, 2, 9)
|
||||||
|
@ -4461,11 +4494,12 @@ class TDTestCase:
|
||||||
tdSql.checkData(5, 2, 13)
|
tdSql.checkData(5, 2, 13)
|
||||||
tdSql.checkData(6, 2, 13)
|
tdSql.checkData(6, 2, 13)
|
||||||
tdSql.checkData(7, 2, 15)
|
tdSql.checkData(7, 2, 15)
|
||||||
|
tdSql.checkData(8, 2, None)
|
||||||
|
|
||||||
|
|
||||||
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname_null} where c0 is not null range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
|
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname_null} where c0 is not null range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
|
||||||
|
|
||||||
tdSql.checkRows(8)
|
tdSql.checkRows(9)
|
||||||
tdSql.checkData(0, 1, False)
|
tdSql.checkData(0, 1, False)
|
||||||
tdSql.checkData(1, 1, True)
|
tdSql.checkData(1, 1, True)
|
||||||
tdSql.checkData(2, 1, True)
|
tdSql.checkData(2, 1, True)
|
||||||
|
@ -4474,6 +4508,7 @@ class TDTestCase:
|
||||||
tdSql.checkData(5, 1, True)
|
tdSql.checkData(5, 1, True)
|
||||||
tdSql.checkData(6, 1, False)
|
tdSql.checkData(6, 1, False)
|
||||||
tdSql.checkData(7, 1, False)
|
tdSql.checkData(7, 1, False)
|
||||||
|
tdSql.checkData(8, 1, True)
|
||||||
|
|
||||||
tdSql.checkData(0, 2, 1)
|
tdSql.checkData(0, 2, 1)
|
||||||
tdSql.checkData(1, 2, 9)
|
tdSql.checkData(1, 2, 9)
|
||||||
|
@ -4483,36 +4518,37 @@ class TDTestCase:
|
||||||
tdSql.checkData(5, 2, 13)
|
tdSql.checkData(5, 2, 13)
|
||||||
tdSql.checkData(6, 2, 13)
|
tdSql.checkData(6, 2, 13)
|
||||||
tdSql.checkData(7, 2, 15)
|
tdSql.checkData(7, 2, 15)
|
||||||
|
tdSql.checkData(8, 2, None)
|
||||||
|
|
||||||
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0, 1) from {dbname}.{stbname_null} partition by tbname range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
|
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0, 1) from {dbname}.{stbname_null} partition by tbname range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
|
||||||
|
|
||||||
tdSql.checkRows(15)
|
tdSql.checkRows(18)
|
||||||
for i in range(0, 7):
|
for i in range(0, 9):
|
||||||
tdSql.checkData(i, 0, 'ctb1_null')
|
tdSql.checkData(i, 0, 'ctb1_null')
|
||||||
|
|
||||||
for i in range(7, 15):
|
for i in range(9, 18):
|
||||||
tdSql.checkData(i, 0, 'ctb2_null')
|
tdSql.checkData(i, 0, 'ctb2_null')
|
||||||
|
|
||||||
tdSql.checkData(0, 1, '2020-02-01 00:00:01.000')
|
tdSql.checkData(0, 1, '2020-02-01 00:00:01.000')
|
||||||
tdSql.checkData(6, 1, '2020-02-01 00:00:13.000')
|
tdSql.checkData(8, 1, '2020-02-01 00:00:17.000')
|
||||||
|
|
||||||
tdSql.checkData(7, 1, '2020-02-01 00:00:01.000')
|
tdSql.checkData(9, 1, '2020-02-01 00:00:01.000')
|
||||||
tdSql.checkData(14, 1, '2020-02-01 00:00:15.000')
|
tdSql.checkData(17, 1, '2020-02-01 00:00:17.000')
|
||||||
|
|
||||||
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname_null} where c0 is not null partition by tbname range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
|
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname_null} where c0 is not null partition by tbname range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)")
|
||||||
|
|
||||||
tdSql.checkRows(15)
|
tdSql.checkRows(18)
|
||||||
for i in range(0, 7):
|
for i in range(0, 9):
|
||||||
tdSql.checkData(i, 0, 'ctb1_null')
|
tdSql.checkData(i, 0, 'ctb1_null')
|
||||||
|
|
||||||
for i in range(7, 15):
|
for i in range(9, 18):
|
||||||
tdSql.checkData(i, 0, 'ctb2_null')
|
tdSql.checkData(i, 0, 'ctb2_null')
|
||||||
|
|
||||||
tdSql.checkData(0, 1, '2020-02-01 00:00:01.000')
|
tdSql.checkData(0, 1, '2020-02-01 00:00:01.000')
|
||||||
tdSql.checkData(6, 1, '2020-02-01 00:00:13.000')
|
tdSql.checkData(8, 1, '2020-02-01 00:00:17.000')
|
||||||
|
|
||||||
tdSql.checkData(7, 1, '2020-02-01 00:00:01.000')
|
tdSql.checkData(9, 1, '2020-02-01 00:00:01.000')
|
||||||
tdSql.checkData(14, 1, '2020-02-01 00:00:15.000')
|
tdSql.checkData(17, 1, '2020-02-01 00:00:17.000')
|
||||||
|
|
||||||
# fill linear
|
# fill linear
|
||||||
# normal table
|
# normal table
|
||||||
|
|
|
@ -0,0 +1,51 @@
|
||||||
|
import taos
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import socket
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
from util.common import *
|
||||||
|
from taos.tmq import *
|
||||||
|
sys.path.append("./7-tmq")
|
||||||
|
from tmqCommon import *
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
tdSql.execute(f'create database if not exists db_taosx')
|
||||||
|
tdSql.execute(f'create database if not exists db_5466')
|
||||||
|
tdSql.execute(f'use db_5466')
|
||||||
|
tdSql.execute(f'create stable if not exists s5466 (ts timestamp, c1 int, c2 int) tags (t binary(32))')
|
||||||
|
tdSql.execute(f'insert into t1 using s5466 tags("__devicid__") values(1669092069068, 0, 1)')
|
||||||
|
for i in range(80):
|
||||||
|
if i < 3:
|
||||||
|
continue
|
||||||
|
tdSql.execute(f'alter stable s5466 add column c{i} int')
|
||||||
|
tdSql.execute(f'insert into t1(ts, c1, c2) values(1669092069067, 0, 1)')
|
||||||
|
tdSql.execute(f'flush database db_5466')
|
||||||
|
|
||||||
|
tdSql.execute("create topic db_5466_topic with meta as database db_5466")
|
||||||
|
buildPath = tdCom.getBuildPath()
|
||||||
|
cmdStr = '%s/build/bin/tmq_ts5466'%(buildPath)
|
||||||
|
tdLog.info(cmdStr)
|
||||||
|
os.system(cmdStr)
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -0,0 +1,38 @@
|
||||||
|
import threading
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.common import *
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor(), logSql)
|
||||||
|
self.tdCom = tdCom
|
||||||
|
self.dbname = "stream_test"
|
||||||
|
|
||||||
|
def stream_with_pk_tag(self):
|
||||||
|
tdSql.execute(f"create database {self.dbname} vgroups 4;")
|
||||||
|
tdSql.execute(f"use {self.dbname};")
|
||||||
|
tdSql.execute("create table st(ts timestamp, a int primary key, b int , c int, d double) tags(ta varchar(100),tb int,tc int);")
|
||||||
|
tdSql.execute('create table t1 using st tags("aa", 1, 2);')
|
||||||
|
tdSql.execute('create stream streams3_2 trigger at_once ignore expired 0 ignore update 0 into streamt3_2 as select _wstart, a, max(b), count(*), ta from st partition by ta, a session(ts, 10s);;')
|
||||||
|
sql_list = ["insert into stream_test.t1 values(1648791210001,1,2,3,4.1);", "insert into stream_test.t1 values(1648791210002,2,2,3,1.1);", "insert into stream_test.t1 values(1648791220000,3,2,3,2.1);", "insert into stream_test.t1 values(1648791220001,4,2,3,3.1);"]
|
||||||
|
for i in range(5):
|
||||||
|
for sql in sql_list:
|
||||||
|
tdSql.execute(sql)
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.stream_with_pk_tag()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
event = threading.Event()
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -3,6 +3,7 @@ add_dependencies(tmq_demo taos)
|
||||||
add_executable(tmq_sim tmqSim.c)
|
add_executable(tmq_sim tmqSim.c)
|
||||||
add_executable(create_table createTable.c)
|
add_executable(create_table createTable.c)
|
||||||
add_executable(tmq_taosx_ci tmq_taosx_ci.c)
|
add_executable(tmq_taosx_ci tmq_taosx_ci.c)
|
||||||
|
add_executable(tmq_ts5466 tmq_ts5466.c)
|
||||||
add_executable(tmq_write_raw_test tmq_write_raw_test.c)
|
add_executable(tmq_write_raw_test tmq_write_raw_test.c)
|
||||||
add_executable(write_raw_block_test write_raw_block_test.c)
|
add_executable(write_raw_block_test write_raw_block_test.c)
|
||||||
add_executable(sml_test sml_test.c)
|
add_executable(sml_test sml_test.c)
|
||||||
|
@ -54,6 +55,13 @@ target_link_libraries(
|
||||||
PUBLIC common
|
PUBLIC common
|
||||||
PUBLIC os
|
PUBLIC os
|
||||||
)
|
)
|
||||||
|
target_link_libraries(
|
||||||
|
tmq_ts5466
|
||||||
|
PUBLIC taos
|
||||||
|
PUBLIC util
|
||||||
|
PUBLIC common
|
||||||
|
PUBLIC os
|
||||||
|
)
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
tmq_taosx_ci
|
tmq_taosx_ci
|
||||||
PUBLIC taos
|
PUBLIC taos
|
||||||
|
|
|
@ -0,0 +1,124 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <assert.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <time.h>
|
||||||
|
#include "cJSON.h"
|
||||||
|
#include "taos.h"
|
||||||
|
#include "tmsg.h"
|
||||||
|
#include "types.h"
|
||||||
|
|
||||||
|
static TAOS* use_db() {
|
||||||
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
if (pConn == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_RES* pRes = taos_query(pConn, "use db_taosx");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("error in use db_taosx, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
return pConn;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void msg_process(TAOS_RES* msg) {
|
||||||
|
printf("-----------topic-------------: %s\n", tmq_get_topic_name(msg));
|
||||||
|
printf("db: %s\n", tmq_get_db_name(msg));
|
||||||
|
printf("vg: %d\n", tmq_get_vgroup_id(msg));
|
||||||
|
TAOS* pConn = use_db();
|
||||||
|
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META || tmq_get_res_type(msg) == TMQ_RES_METADATA) {
|
||||||
|
char* result = tmq_get_json_meta(msg);
|
||||||
|
printf("meta result: %s\n", result);
|
||||||
|
tmq_free_json_meta(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
tmq_raw_data raw = {0};
|
||||||
|
tmq_get_raw(msg, &raw);
|
||||||
|
printf("write raw data type: %d\n", raw.raw_type);
|
||||||
|
int32_t ret = tmq_write_raw(pConn, raw);
|
||||||
|
printf("write raw data: %s\n", tmq_err2str(ret));
|
||||||
|
ASSERT(ret == 0);
|
||||||
|
|
||||||
|
tmq_free_raw(raw);
|
||||||
|
taos_close(pConn);
|
||||||
|
}
|
||||||
|
|
||||||
|
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
||||||
|
printf("commit %d tmq %p param %p\n", code, tmq, param);
|
||||||
|
}
|
||||||
|
|
||||||
|
tmq_t* build_consumer() {
|
||||||
|
tmq_conf_t* conf = tmq_conf_new();
|
||||||
|
tmq_conf_set(conf, "group.id", "tg2");
|
||||||
|
tmq_conf_set(conf, "client.id", "my app 1");
|
||||||
|
tmq_conf_set(conf, "td.connect.user", "root");
|
||||||
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
|
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||||
|
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||||
|
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||||
|
tmq_conf_set(conf, "msg.consume.excluded", "1");
|
||||||
|
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||||
|
|
||||||
|
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||||
|
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||||
|
assert(tmq);
|
||||||
|
tmq_conf_destroy(conf);
|
||||||
|
return tmq;
|
||||||
|
}
|
||||||
|
|
||||||
|
tmq_list_t* build_topic_list() {
|
||||||
|
tmq_list_t* topic_list = tmq_list_new();
|
||||||
|
tmq_list_append(topic_list, "db_5466_topic");
|
||||||
|
return topic_list;
|
||||||
|
}
|
||||||
|
|
||||||
|
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||||
|
int32_t code;
|
||||||
|
|
||||||
|
if ((code = tmq_subscribe(tmq, topics))) {
|
||||||
|
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
|
||||||
|
printf("subscribe err\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
int32_t cnt = 0;
|
||||||
|
while (1) {
|
||||||
|
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 5000);
|
||||||
|
if (tmqmessage) {
|
||||||
|
cnt++;
|
||||||
|
msg_process(tmqmessage);
|
||||||
|
taos_free_result(tmqmessage);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tmq_consumer_close(tmq);
|
||||||
|
if (code)
|
||||||
|
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
|
||||||
|
else
|
||||||
|
fprintf(stderr, "%% Consumer closed\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char* argv[]) {
|
||||||
|
tmq_t* tmq = build_consumer();
|
||||||
|
tmq_list_t* topic_list = build_topic_list();
|
||||||
|
basic_consume_loop(tmq, topic_list);
|
||||||
|
tmq_list_destroy(topic_list);
|
||||||
|
}
|
Loading…
Reference in New Issue