Merge branch '3.0' into enh/TD-29801-3.0

This commit is contained in:
kailixu 2024-04-28 04:27:25 +08:00
commit 0b5e35643c
20 changed files with 361 additions and 211 deletions

View File

@ -435,6 +435,7 @@ typedef struct SUpstreamInfo {
typedef struct SDownstreamStatusInfo {
int64_t reqId;
int32_t taskId;
int32_t vgId;
int64_t rspTs;
int32_t status;
} SDownstreamStatusInfo;
@ -847,12 +848,9 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
bool streamTaskIsSinkTask(const SStreamTask* pTask);
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id);
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
int32_t* pNotReady, const char* id);
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo);
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask);
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id);
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);

View File

@ -159,7 +159,7 @@ done
tools=(${clientName} ${benchmarkName} ${dumpName} ${demoName} remove.sh udfd set_core.sh TDinsight.sh start_pre.sh)
if [ "${verMode}" == "cluster" ]; then
services=(${serverName} ${adapterName} ${xname} ${explorerName} ${keeperName})
elif [ "${verMode}" == "community" ]; then
elif [ "${verMode}" == "edge" ]; then
if [ "${pagMode}" == "full" ]; then
services=(${serverName} ${adapterName} ${keeperName} ${explorerName})
else
@ -229,6 +229,10 @@ function install_bin() {
fi
fi
if [ -f ${script_dir}/bin/quick_deploy.sh ]; then
${csudo}cp -r ${script_dir}/bin/quick_deploy.sh ${install_main_dir}/bin
fi
${csudo}chmod 0555 ${install_main_dir}/bin/*
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}mv ${install_main_dir}/bin/remove.sh ${install_main_dir}/uninstall.sh || :
@ -503,7 +507,7 @@ function local_fqdn_check() {
function install_taosx_config() {
[ ! -z $1 ] && return 0 || : # only install client
fileName="${script_dir}/${xname}/etc/taos/${xname}.toml"
fileName="${script_dir}/${xname}/etc/${PREFIX}/${xname}.toml"
if [ -f ${fileName} ]; then
${csudo}sed -i -r "s/#*\s*(fqdn\s*=\s*).*/\1\"${serverFqdn}\"/" ${fileName}
@ -520,10 +524,11 @@ function install_explorer_config() {
[ ! -z $1 ] && return 0 || : # only install client
if [ "$verMode" == "cluster" ]; then
fileName="${script_dir}/${xname}/etc/taos/explorer.toml"
fileName="${script_dir}/${xname}/etc/${PREFIX}/explorer.toml"
else
fileName="${script_dir}/cfg/explorer.toml"
}
fi
if [ -f ${fileName} ]; then
${csudo}sed -i "s/localhost/${serverFqdn}/g" ${fileName}
@ -655,13 +660,13 @@ function install_connector() {
function install_examples() {
if [ -d ${script_dir}/examples ]; then
${csudo}cp -rf ${script_dir}/examples/* ${install_main_dir}/examples || echo "failed to copy examples"
${csudo}cp -rf ${script_dir}/examples ${install_main_dir}/ || echo "failed to copy examples"
fi
}
function install_plugins() {
if [ -d ${script_dir}/${xname}/plugins ]; then
${csudo}cp -rf ${script_dir}/${xname}/plugins/ ${install_main_dir}/ || echo "failed to copy taosx plugins"
${csudo}cp -rf ${script_dir}/${xname}/plugins/ ${install_main_dir}/ || echo "failed to copy ${PREFIX}x plugins"
fi
}

View File

@ -832,7 +832,7 @@ static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
return result;
}
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols, SArray *checkDumplicateCols,
ESchemaAction *action, bool isTag) {
int32_t code = TSDB_CODE_SUCCESS;
for (int j = 0; j < taosArrayGetSize(cols); ++j) {
@ -843,6 +843,13 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH
return code;
}
}
for (int j = 0; j < taosArrayGetSize(checkDumplicateCols); ++j) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(checkDumplicateCols, j);
if(taosHashGet(schemaHash, kv->key, kv->keyLen) != NULL){
return TSDB_CODE_PAR_DUPLICATED_COLUMN;
}
}
return TSDB_CODE_SUCCESS;
}
@ -1106,7 +1113,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
}
ESchemaAction action = SCHEMA_ACTION_NULL;
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true);
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, sTableData->cols, &action, true);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
@ -1181,7 +1188,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
}
action = SCHEMA_ACTION_NULL;
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false);
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, sTableData->tags, &action, false);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}

View File

@ -191,8 +191,8 @@ static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "start_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "start_ver", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "checkpoint_id", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "checkpoint_version", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "checkpoint_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "checkpoint_version", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},

View File

@ -272,7 +272,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
int32_t tsTransPullupInterval = 2;
int32_t tsCompactPullupInterval = 10;
int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointInterval = 300;
int32_t tsStreamCheckpointInterval = 60;
float tsSinkDataRate = 2.0;
int32_t tsStreamNodeCheckInterval = 16;
int32_t tsTtlUnit = 86400;

View File

@ -16,6 +16,8 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
#define PROCESS_THRESHOLD (2000 * 1000)
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
int32_t code = 0;
taosThreadRwlockRdlock(&pMgmt->lock);
@ -53,6 +55,14 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
int32_t code = mndProcessRpcMsg(pMsg);
if (pInfo->timestamp != 0) {
int64_t cost = taosGetTimestampUs() - pInfo->timestamp;
if (cost > PROCESS_THRESHOLD) {
dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum,
TMSG_INFO(pMsg->msgType), cost / (1000 * 1000));
}
}
if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
if (code != 0 && terrno != 0) code = terrno;
mmSendRsp(pMsg, code);

View File

@ -27,6 +27,8 @@
#define ARBGROUP_VER_NUMBER 1
#define ARBGROUP_RESERVE_SIZE 64
static SHashObj *arbUpdateHash = NULL;
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup);
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew);
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup);
@ -74,10 +76,14 @@ int32_t mndInitArbGroup(SMnode *pMnode) {
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup);
arbUpdateHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupArbGroup(SMnode *pMnode) {}
void mndCleanupArbGroup(SMnode *pMnode) {
taosHashCleanup(arbUpdateHash);
}
SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) {
SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
@ -221,8 +227,7 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p
mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64
" new version:%" PRId64,
pOld->vgId, pOld, pNew, pOld->version, pNew->version);
taosThreadMutexUnlock(&pOld->mutex);
return 0;
goto _OVER;
}
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
@ -232,7 +237,11 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p
pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
pOld->version++;
_OVER:
taosThreadMutexUnlock(&pOld->mutex);
taosHashRemove(arbUpdateHash, &pOld->vgId, sizeof(int32_t));
return 0;
}
@ -645,6 +654,11 @@ static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup)
}
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
return 0;
}
int32_t contLen = 0;
void *pHead = mndBuildArbUpdateGroupReq(&contLen, pNewGroup);
if (!pHead) {
@ -653,7 +667,11 @@ static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
}
SRpcMsg rpcMsg = {.msgType = TDMT_MND_ARB_UPDATE_GROUP, .pCont = pHead, .contLen = contLen, .info.noResp = true};
return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
int32_t ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
if (ret == 0) {
taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0);
}
return ret;
}
static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) {
@ -930,8 +948,12 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
SVArbCheckSyncRsp syncRsp = {0};
if (tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
mInfo("arb sync check failed, since:%s", tstrerror(pRsp->code));
if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
terrno = TSDB_CODE_SUCCESS;
return 0;
}
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}

View File

@ -847,7 +847,7 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
if (pIter == NULL) break;
maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
mDebug("stream:%p, %s id:%" PRIx64 "checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
pStream->checkpointId);
sdbRelease(pSdb, pStream);
}

View File

@ -1156,14 +1156,24 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
// check if the checkpoint msg already sent or not.
if (status == TASK_STATUS__CK) {
tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64
" transId:%d already received, ignore this msg and continue process checkpoint",
tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
" transId:%d already handled, ignore msg and continue process checkpoint",
pTask->id.idStr, pTask->chkInfo.checkpointingId, req.transId);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
} else { // checkpoint already finished, and not in checkpoint status
if (req.checkpointId <= pTask->chkInfo.checkpointId) {
tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
" transId:%d already handled, ignore and discard", pTask->id.idStr, req.checkpointId, req.transId);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
}
streamProcessCheckpointSourceReq(pTask, &req);

View File

@ -485,6 +485,7 @@ SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, in
int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
if (tSimpleHashGetSize(pSinkTableMap) > MAX_CACHE_TABLE_INFO_NUM) {
taosMemoryFreeClear(pTableSinkInfo); // too many items, failed to cache it
return TSDB_CODE_FAILED;
}

View File

@ -897,6 +897,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
// commit
code = commit_edit(fs);
ASSERT(code == 0);
TSDB_CHECK_CODE(code, lino, _exit);
// schedule merge
@ -973,11 +974,11 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbDebug("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
tsem_post(&fs->canEdit);
tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
}
tsem_post(&fs->canEdit);
return code;
}

View File

@ -580,9 +580,9 @@ int32_t tsdbMerge(void *arg) {
}
*/
// do merge
tsdbDebug("vgId:%d merge begin, fid:%d", TD_VID(tsdb->pVnode), merger->fid);
tsdbInfo("vgId:%d merge begin, fid:%d", TD_VID(tsdb->pVnode), merger->fid);
code = tsdbDoMerge(merger);
tsdbDebug("vgId:%d merge done, fid:%d", TD_VID(tsdb->pVnode), mergeArg->fid);
tsdbInfo("vgId:%d merge done, fid:%d", TD_VID(tsdb->pVnode), mergeArg->fid);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:

View File

@ -69,6 +69,7 @@ typedef struct {
int64_t chkpId;
char* dbPrefixPath;
} SStreamTaskSnap;
struct STokenBucket {
int32_t numCapacity; // total capacity, available token per second
int32_t numOfToken; // total available tokens
@ -148,18 +149,19 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
void streamMetaRemoveDB(void* arg, char* key);
typedef enum UPLOAD_TYPE {
UPLOAD_DISABLE = -1,
UPLOAD_S3 = 0,
UPLOAD_RSYNC = 1,
} UPLOAD_TYPE;
typedef enum ECHECKPOINT_BACKUP_TYPE {
DATA_UPLOAD_DISABLE = -1,
DATA_UPLOAD_S3 = 0,
DATA_UPLOAD_RSYNC = 1,
} ECHECKPOINT_BACKUP_TYPE;
UPLOAD_TYPE getUploadType();
int uploadCheckpoint(char* id, char* path);
int downloadCheckpoint(char* id, char* path);
int deleteCheckpoint(char* id);
int deleteCheckpointFile(char* id, char* name);
int downloadCheckpointByName(char* id, char* fname, char* dstName);
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
int32_t streamTaskBackupCheckpoint(char* id, char* path);
int32_t downloadCheckpoint(char* id, char* path);
int32_t deleteCheckpoint(char* id);
int32_t deleteCheckpointFile(char* id, char* name);
int32_t downloadCheckpointByName(char* id, char* fname, char* dstName);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);

View File

@ -376,10 +376,10 @@ int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char
return code;
}
int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
UPLOAD_TYPE type = getUploadType();
if (type == UPLOAD_S3) {
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
if (type == DATA_UPLOAD_S3) {
return rebuildFromRemoteChkp_s3(key, chkpPath, chkpId, defaultPath);
} else if (type == UPLOAD_RSYNC) {
} else if (type == DATA_UPLOAD_RSYNC) {
return rebuildFromRemoteChkp_rsync(key, chkpPath, chkpId, defaultPath);
}
return -1;
@ -2111,11 +2111,11 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
}
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) {
STaskDbWrapper* pDb = arg;
UPLOAD_TYPE utype = type;
ECHECKPOINT_BACKUP_TYPE utype = type;
if (utype == UPLOAD_RSYNC) {
if (utype == DATA_UPLOAD_RSYNC) {
return taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
} else if (utype == UPLOAD_S3) {
} else if (utype == DATA_UPLOAD_S3) {
return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list);
}
return -1;

View File

@ -26,16 +26,22 @@ static void rspMonitorFn(void* param, void* tmrId);
static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id);
static int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id);
static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p);
static void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList);
static void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList);
static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs,
int64_t reqId, int32_t* pNotReady, const char* id);
static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId);
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id);
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId);
// check status
void streamTaskCheckDownstream(SStreamTask* pTask) {
SDataRange* pRange = &pTask->dataRange;
STimeWindow* pWindow = &pRange->window;
const char* idstr = pTask->id.idStr;
SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId,
@ -51,16 +57,15 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
streamTaskStartMonitorCheckRsp(pTask);
req.reqId = tGenIdPI64();
req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId;
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
setCheckDownstreamReqInfo(&req, tGenIdPI64(), pDispatch->taskId, pDispatch->nodeId);
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pDispatch->taskId, pDispatch->nodeId, idstr);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
" window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId,
pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer,
pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
@ -70,25 +75,23 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, idstr,
numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.reqId = tGenIdPI64();
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
setCheckDownstreamReqInfo(&req, tGenIdPI64(), pVgInfo->taskId, pVgInfo->vgId);
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pVgInfo->taskId, pVgInfo->vgId, idstr);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
" check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
} else { // for sink task, set it ready directly.
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId);
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr);
processDownstreamReadyRsp(pTask);
}
}
@ -158,31 +161,12 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
return 0;
}
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) {
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0};
taosThreadMutexLock(&pInfo->checkInfoLock);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
taosArrayPush(pInfo->pList, &info);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
taosThreadMutexLock(&pInfo->checkInfoLock);
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) {
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_FAILED;
}
@ -307,10 +291,8 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
if (reqId != p->reqId) {
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64
" expired check-rsp recv from downstream task:0x%x, discarded",
stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded",
id, reqId, p->reqId, taskId);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_FAILED;
@ -341,7 +323,8 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
pInfo->inCheckProcess = 1;
} else {
ASSERT(pInfo->startTs > 0);
stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs);
stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id,
pInfo->startTs);
return TSDB_CODE_FAILED;
}
@ -355,7 +338,7 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char*
}
if (!pInfo->inCheckProcess) {
stWarn("s-task:%s already not in-check-procedure", id);
// stWarn("s-task:%s already not in-check-procedure", id);
}
int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
@ -378,6 +361,24 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char*
return 0;
}
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) {
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0};
taosThreadMutexLock(&pInfo->checkInfoLock);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
taosArrayPush(pInfo->pList, &info);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId,
@ -389,9 +390,9 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
req.reqId = p->reqId;
req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId;
req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId;
STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher;
setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->taskId);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
@ -404,9 +405,7 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (p->taskId == pVgInfo->taskId) {
req.reqId = p->reqId;
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
setCheckDownstreamReqInfo(&req, p->reqId, pVgInfo->taskId, pVgInfo->vgId);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64,
@ -423,7 +422,6 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
if (p->status == TASK_DOWNSTREAM_READY) {
@ -447,6 +445,78 @@ void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, i
}
}
void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId) {
pReq->reqId = reqId;
pReq->downstreamTaskId = dstTaskId;
pReq->downstreamNodeId = dstNodeId;
}
void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
int32_t numOfTimeout = taosArrayGetSize(pTimeoutList);
ASSERT(pTask->status.downstreamReady == 0);
for (int32_t i = 0; i < numOfTimeout; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
ASSERT(p->status == -1 && p->rspTs == 0);
doSendCheckMsg(pTask, p);
}
}
pInfo->timeoutRetryCount += 1;
// timeout more than 100 sec, add into node update list
if (pInfo->timeoutRetryCount > 10) {
pInfo->timeoutRetryCount = 0;
for (int32_t i = 0; i < numOfTimeout; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
addIntoNodeUpdateList(pTask, p->vgId);
stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list",
id, vgId, p->taskId, p->vgId);
}
}
stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout);
} else {
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs);
}
}
void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
int32_t numOfNotReady = taosArrayGetSize(pNotReadyList);
ASSERT(pTask->status.downstreamReady == 0);
// reset the info, and send the check msg to failure downstream again
for (int32_t i = 0; i < numOfNotReady; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
p->rspTs = 0;
p->status = -1;
doSendCheckMsg(pTask, p);
}
}
pInfo->notReadyRetryCount += 1;
stDebug("s-task:%s vgId:%d %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id,
vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
}
void rspMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
SStreamTaskState* pStat = streamTaskGetStatus(pTask);
@ -461,6 +531,7 @@ void rspMonitorFn(void* param, void* tmrId) {
int32_t numOfNotRsp = 0;
int32_t numOfNotReady = 0;
int32_t numOfTimeout = 0;
int32_t total = taosArrayGetSize(pInfo->pList);
stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
@ -510,7 +581,7 @@ void rspMonitorFn(void* param, void* tmrId) {
numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
// fault tasks detected, not try anymore
ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList));
ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == total);
if (numOfFault > 0) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug(
@ -550,57 +621,18 @@ void rspMonitorFn(void* param, void* tmrId) {
}
if (numOfNotReady > 0) { // check to make sure not in recheck timer
ASSERT(pTask->status.downstreamReady == 0);
// reset the info, and send the check msg to failure downstream again
for (int32_t i = 0; i < numOfNotReady; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
p->rspTs = 0;
p->status = -1;
doSendCheckMsg(pTask, p);
}
handleNotReadyDownstreamTask(pTask, pNotReadyList);
}
pInfo->notReadyRetryCount += 1;
stDebug("s-task:%s %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id,
numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs);
}
// todo add into node update list and send to mnode
if (numOfTimeout > 0) {
ASSERT(pTask->status.downstreamReady == 0);
for (int32_t i = 0; i < numOfTimeout; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId);
if (p != NULL) {
ASSERT(p->status == -1 && p->rspTs == 0);
doSendCheckMsg(pTask, p);
}
}
pInfo->timeoutRetryCount += 1;
// timeout more than 100 sec, add into node update list
if (pInfo->timeoutRetryCount > 10) {
pInfo->timeoutRetryCount = 0;
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout more than 100sec, add into nodeUpate list", id, vgId,
numOfTimeout);
} else {
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs);
}
handleTimeoutDownstreamTasks(pTask, pTimeoutList);
}
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id,
numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
id, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);

View File

@ -19,7 +19,7 @@
#include "streamInt.h"
typedef struct {
UPLOAD_TYPE type;
ECHECKPOINT_BACKUP_TYPE type;
char* taskId;
int64_t chkpId;
@ -416,7 +416,7 @@ int32_t getChkpMeta(char* id, char* path, SArray* list) {
return code;
}
int32_t doUploadChkp(void* param) {
int32_t uploadCheckpointData(void* param) {
SAsyncUploadArg* arg = param;
char* path = NULL;
int32_t code = 0;
@ -426,13 +426,13 @@ int32_t doUploadChkp(void* param) {
(int8_t)(arg->type), &path, toDelFiles)) != 0) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);
}
if (arg->type == UPLOAD_S3) {
if (arg->type == DATA_UPLOAD_S3) {
if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) {
stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId);
}
}
if (code == 0 && (code = uploadCheckpoint(arg->taskId, path)) != 0) {
if (code == 0 && (code = streamTaskBackupCheckpoint(arg->taskId, path)) != 0) {
stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId);
}
@ -459,8 +459,8 @@ int32_t doUploadChkp(void* param) {
int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
// async upload
UPLOAD_TYPE type = getUploadType();
if (type == UPLOAD_DISABLE) {
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
if (type == DATA_UPLOAD_DISABLE) {
return 0;
}
@ -474,7 +474,7 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
arg->chkpId = chkpId;
arg->pTask = pTask;
return streamMetaAsyncExec(pTask->pMeta, doUploadChkp, arg, NULL);
return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL);
}
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
@ -558,7 +558,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
return code;
}
static int uploadCheckpointToS3(char* id, char* path) {
static int32_t uploadCheckpointToS3(char* id, char* path) {
TdDirPtr pDir = taosOpenDir(path);
if (pDir == NULL) return -1;
@ -590,8 +590,8 @@ static int uploadCheckpointToS3(char* id, char* path) {
return 0;
}
static int downloadCheckpointByNameS3(char* id, char* fname, char* dstName) {
int code = 0;
static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) {
int32_t code = 0;
char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4);
sprintf(buf, "%s/%s", id, fname);
if (s3GetObjectToFile(buf, dstName) != 0) {
@ -601,19 +601,19 @@ static int downloadCheckpointByNameS3(char* id, char* fname, char* dstName) {
return code;
}
UPLOAD_TYPE getUploadType() {
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
if (strlen(tsSnodeAddress) != 0) {
return UPLOAD_RSYNC;
return DATA_UPLOAD_RSYNC;
} else if (tsS3StreamEnabled) {
return UPLOAD_S3;
return DATA_UPLOAD_S3;
} else {
return UPLOAD_DISABLE;
return DATA_UPLOAD_DISABLE;
}
}
int uploadCheckpoint(char* id, char* path) {
int32_t streamTaskBackupCheckpoint(char* id, char* path) {
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
stError("uploadCheckpoint parameters invalid");
stError("streamTaskBackupCheckpoint parameters invalid");
return -1;
}
if (strlen(tsSnodeAddress) != 0) {
@ -625,7 +625,7 @@ int uploadCheckpoint(char* id, char* path) {
}
// fileName: CURRENT
int downloadCheckpointByName(char* id, char* fname, char* dstName) {
int32_t downloadCheckpointByName(char* id, char* fname, char* dstName) {
if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
stError("uploadCheckpointByName parameters invalid");
return -1;
@ -638,7 +638,7 @@ int downloadCheckpointByName(char* id, char* fname, char* dstName) {
return 0;
}
int downloadCheckpoint(char* id, char* path) {
int32_t downloadCheckpoint(char* id, char* path) {
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
stError("downloadCheckpoint parameters invalid");
return -1;
@ -651,7 +651,7 @@ int downloadCheckpoint(char* id, char* path) {
return 0;
}
int deleteCheckpoint(char* id) {
int32_t deleteCheckpoint(char* id) {
if (id == NULL || strlen(id) == 0) {
stError("deleteCheckpoint parameters invalid");
return -1;
@ -664,7 +664,7 @@ int deleteCheckpoint(char* id) {
return 0;
}
int deleteCheckpointFile(char* id, char* name) {
int32_t deleteCheckpointFile(char* id, char* name) {
char object[128] = {0};
snprintf(object, sizeof(object), "%s/%s", id, name);
char* tmp = object;

View File

@ -17,7 +17,6 @@
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MAX_SMOOTH_BURST_RATIO 5 // 5 sec
#define WAIT_FOR_DURATION 10
// todo refactor:
// read data from input queue

View File

@ -185,8 +185,8 @@ class TDTestCase:
# baseVersion = "3.0.1.8"
tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{BASEVERSION}")
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -v 1 -y ")
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -v 1 -y ")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database test '")
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/com_alltypedata.json -y")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database curdb '")
@ -196,49 +196,81 @@ class TDTestCase:
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'select min(ui) from curdb.meters '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'select max(bi) from curdb.meters '")
# os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ")
# os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ')
# os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ')
os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ")
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream trigger at_once into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ')
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ')
self.alter_string_in_file("0-others/tmqBasic.json", "/etc/taos/", cPath)
# os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/tmqBasic.json -y ")
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists tmq_test_topic as select current,voltage,phase from test.meters where voltage <= 106 and current <= 5;" ')
# create db/stb/select topic
db_topic = "db_test_topic"
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {db_topic} with meta as database test" ')
stable_topic = "stable_test_meters_topic"
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {stable_topic} as stable test.meters where tbname like \\"d3\\";" ')
select_topic = "select_test_meters_topic"
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {select_topic} as select current,voltage,phase from test.meters where voltage >= 170;" ')
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show topics;" ')
os.system(f" /usr/bin/taosadapter --version " )
consumer_dict = {
"group.id": "g1",
"td.connect.websocket.scheme": "ws",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
"enable.auto.commit": "false",
}
consumer = taosws.Consumer(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
consumer = taosws.Consumer(consumer_dict)
try:
consumer.subscribe(["tmq_test_topic"])
consumer.subscribe([select_topic])
except TmqError:
tdLog.exit(f"subscribe error")
first_consumer_rows = 0
while True:
message = consumer.poll(timeout=1.0)
if message:
print("message")
id = message.vgroup()
topic = message.topic()
database = message.database()
for block in message:
nrows = block.nrows()
ncols = block.ncols()
for row in block:
print(row)
values = block.fetchall()
print(nrows, ncols)
consumer.commit(message)
first_consumer_rows += block.nrows()
else:
print("break")
tdLog.notice("message is null and break")
break
consumer.commit(message)
tdLog.debug(f"topic:{select_topic} ,first consumer rows is {first_consumer_rows} in old version")
break
consumer.close()
# consumer_dict2 = {
# "group.id": "g2",
# "td.connect.websocket.scheme": "ws",
# "td.connect.user": "root",
# "td.connect.pass": "taosdata",
# "auto.offset.reset": "earliest",
# "enable.auto.commit": "false",
# }
# consumer = taosws.Consumer(consumer_dict2)
# try:
# consumer.subscribe([db_topic,stable_topic])
# except TmqError:
# tdLog.exit(f"subscribe error")
# first_consumer_rows = 0
# while True:
# message = consumer.poll(timeout=1.0)
# if message:
# for block in message:
# first_consumer_rows += block.nrows()
# else:
# tdLog.notice("message is null and break")
# break
# consumer.commit(message)
# tdLog.debug(f"topic:{select_topic} ,first consumer rows is {first_consumer_rows} in old version")
# break
tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ")
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '")
@ -279,11 +311,10 @@ class TDTestCase:
tdLog.printNoPrefix(f"==========step3:prepare and check data in new version-{nowServerVersion}")
tdsql.query(f"select count(*) from {stb}")
tdsql.checkData(0,0,tableNumbers*recordNumbers1)
# tdsql.query("show streams;")
# os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ")
# tdsql.query("show streams;")
# tdsql.query(f"select count(*) from {stb}")
# tdsql.checkData(0,0,tableNumbers*recordNumbers2)
tdsql.query("show streams;")
tdsql.checkRows(2)
# checkout db4096
tdsql.query("select count(*) from db4096.stb0")
@ -334,7 +365,7 @@ class TDTestCase:
# check stream
tdsql.query("show streams;")
tdsql.checkRows(0)
tdsql.checkRows(2)
#check TS-3131
tdsql.query("select *,tbname from d0.almlog where mcid='m0103';")
@ -348,39 +379,48 @@ class TDTestCase:
print("The unordered list is the same as the ordered list.")
else:
tdLog.exit("The unordered list is not the same as the ordered list.")
tdsql.execute("insert into test.d80 values (now+1s, 11, 103, 0.21);")
tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);")
# check tmq
tdsql.execute("insert into test.d80 values (now+1s, 11, 190, 0.21);")
tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);")
conn = taos.connect()
consumer = Consumer(
{
"group.id": "tg75",
"client.id": "124",
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
"experimental.snapshot.enable": "true",
}
)
consumer.subscribe(["tmq_test_topic"])
consumer.subscribe([select_topic])
consumer_rows = 0
while True:
res = consumer.poll(10)
if not res:
message = consumer.poll(timeout=1.0)
tdLog.info(f" null {message}")
if message:
for block in message:
consumer_rows += block.nrows()
tdLog.info(f"consumer rows is {consumer_rows}")
else:
print("consumer has completed and break")
break
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
consumer.close()
tdsql.query("select current,voltage,phase from test.meters where voltage >= 170;")
all_rows = tdsql.queryRows
if consumer_rows < all_rows - first_consumer_rows :
tdLog.exit(f"consumer rows is {consumer_rows}, less than {all_rows - first_consumer_rows}")
tdsql.query("show topics;")
tdsql.checkRows(1)
tdsql.checkRows(3)
tdsql.execute(f"drop topic {select_topic};",queryTimes=10)
tdsql.execute(f"drop topic {db_topic};",queryTimes=10)
tdsql.execute(f"drop topic {stable_topic};",queryTimes=10)
os.system(f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ")
tdsql.query(f"select count(*) from {stb}")
tdsql.checkData(0,0,tableNumbers*recordNumbers2)
def stop(self):
tdSql.close()

View File

@ -62,7 +62,7 @@ class TDTestCase:
while True:
res = consumer.poll(1)
if not res:
break
continue
val = res.value()
if val is None:
continue
@ -173,7 +173,7 @@ class TDTestCase:
while True:
res = consumer.poll(1)
if not res:
break
continue
val = res.value()
if val is None:
continue
@ -282,7 +282,7 @@ class TDTestCase:
while True:
res = consumer.poll(1)
if not res:
break
continue
val = res.value()
if val is None:
continue
@ -391,7 +391,7 @@ class TDTestCase:
while True:
res = consumer.poll(1)
if not res:
break
continue
val = res.value()
if val is None:
continue

View File

@ -1866,6 +1866,29 @@ int sml_td29691_Test() {
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
ASSERT(code == TSDB_CODE_PAR_DUPLICATED_COLUMN);
taos_free_result(pRes);
//check column tag name duplication when update
const char *sql7[] = {
"vbin,t1=1,t2=2,f1=ewe f2=b\"hello\" 1632299372003",
};
pRes = taos_schemaless_insert(taos, (char **)sql7, sizeof(sql7) / sizeof(sql7[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(pRes);
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
ASSERT(code == TSDB_CODE_PAR_DUPLICATED_COLUMN);
taos_free_result(pRes);
//check column tag name duplication when update
const char *sql6[] = {
"vbin,t1=1 t2=2,f1=1,f2=b\"hello\" 1632299372004",
};
pRes = taos_schemaless_insert(taos, (char **)sql6, sizeof(sql6) / sizeof(sql6[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(pRes);
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
ASSERT(code == TSDB_CODE_PAR_DUPLICATED_COLUMN);
taos_free_result(pRes);
taos_close(taos);
return code;