diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 321ca9eb77..4423bf94b6 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -435,6 +435,7 @@ typedef struct SUpstreamInfo { typedef struct SDownstreamStatusInfo { int64_t reqId; int32_t taskId; + int32_t vgId; int64_t rspTs; int32_t status; } SDownstreamStatusInfo; @@ -847,12 +848,9 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); bool streamTaskIsSinkTask(const SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); -int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id); -int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId, - int32_t* pNotReady, const char* id); -void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask); int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); +void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index 16c95aa2d3..5a83cdc6a8 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -159,7 +159,7 @@ done tools=(${clientName} ${benchmarkName} ${dumpName} ${demoName} remove.sh udfd set_core.sh TDinsight.sh start_pre.sh) if [ "${verMode}" == "cluster" ]; then services=(${serverName} ${adapterName} ${xname} ${explorerName} ${keeperName}) -elif [ "${verMode}" == "community" ]; then +elif [ "${verMode}" == "edge" ]; then if [ "${pagMode}" == "full" ]; then services=(${serverName} ${adapterName} ${keeperName} ${explorerName}) else @@ -229,6 +229,10 @@ function install_bin() { fi fi + if [ -f ${script_dir}/bin/quick_deploy.sh ]; then + ${csudo}cp -r ${script_dir}/bin/quick_deploy.sh ${install_main_dir}/bin + fi + ${csudo}chmod 0555 ${install_main_dir}/bin/* [ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}mv ${install_main_dir}/bin/remove.sh ${install_main_dir}/uninstall.sh || : @@ -503,7 +507,7 @@ function local_fqdn_check() { function install_taosx_config() { [ ! -z $1 ] && return 0 || : # only install client - fileName="${script_dir}/${xname}/etc/taos/${xname}.toml" + fileName="${script_dir}/${xname}/etc/${PREFIX}/${xname}.toml" if [ -f ${fileName} ]; then ${csudo}sed -i -r "s/#*\s*(fqdn\s*=\s*).*/\1\"${serverFqdn}\"/" ${fileName} @@ -520,10 +524,11 @@ function install_explorer_config() { [ ! -z $1 ] && return 0 || : # only install client if [ "$verMode" == "cluster" ]; then - fileName="${script_dir}/${xname}/etc/taos/explorer.toml" + fileName="${script_dir}/${xname}/etc/${PREFIX}/explorer.toml" else fileName="${script_dir}/cfg/explorer.toml" - } + fi + if [ -f ${fileName} ]; then ${csudo}sed -i "s/localhost/${serverFqdn}/g" ${fileName} @@ -655,13 +660,13 @@ function install_connector() { function install_examples() { if [ -d ${script_dir}/examples ]; then - ${csudo}cp -rf ${script_dir}/examples/* ${install_main_dir}/examples || echo "failed to copy examples" + ${csudo}cp -rf ${script_dir}/examples ${install_main_dir}/ || echo "failed to copy examples" fi } function install_plugins() { if [ -d ${script_dir}/${xname}/plugins ]; then - ${csudo}cp -rf ${script_dir}/${xname}/plugins/ ${install_main_dir}/ || echo "failed to copy taosx plugins" + ${csudo}cp -rf ${script_dir}/${xname}/plugins/ ${install_main_dir}/ || echo "failed to copy ${PREFIX}x plugins" fi } diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 1681fd1551..6225cf703c 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -832,7 +832,7 @@ static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) { return result; } -static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols, +static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols, SArray *checkDumplicateCols, ESchemaAction *action, bool isTag) { int32_t code = TSDB_CODE_SUCCESS; for (int j = 0; j < taosArrayGetSize(cols); ++j) { @@ -843,6 +843,13 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH return code; } } + + for (int j = 0; j < taosArrayGetSize(checkDumplicateCols); ++j) { + SSmlKv *kv = (SSmlKv *)taosArrayGet(checkDumplicateCols, j); + if(taosHashGet(schemaHash, kv->key, kv->keyLen) != NULL){ + return TSDB_CODE_PAR_DUPLICATED_COLUMN; + } + } return TSDB_CODE_SUCCESS; } @@ -1106,7 +1113,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { } ESchemaAction action = SCHEMA_ACTION_NULL; - code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true); + code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, sTableData->cols, &action, true); if (code != TSDB_CODE_SUCCESS) { goto end; } @@ -1181,7 +1188,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES); } action = SCHEMA_ACTION_NULL; - code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false); + code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, sTableData->tags, &action, false); if (code != TSDB_CODE_SUCCESS) { goto end; } diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 4dfe9d900f..14e8088dfe 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -191,8 +191,8 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "start_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "start_ver", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, - {.name = "checkpoint_id", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, - {.name = "checkpoint_version", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "checkpoint_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "checkpoint_version", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 9a5392ca1f..1ac762e004 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -272,7 +272,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch int32_t tsTransPullupInterval = 2; int32_t tsCompactPullupInterval = 10; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointInterval = 300; +int32_t tsStreamCheckpointInterval = 60; float tsSinkDataRate = 2.0; int32_t tsStreamNodeCheckInterval = 16; int32_t tsTtlUnit = 86400; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 75d5669824..885086e37a 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -16,6 +16,8 @@ #define _DEFAULT_SOURCE #include "mmInt.h" +#define PROCESS_THRESHOLD (2000 * 1000) + static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) { int32_t code = 0; taosThreadRwlockRdlock(&pMgmt->lock); @@ -53,6 +55,14 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { int32_t code = mndProcessRpcMsg(pMsg); + if (pInfo->timestamp != 0) { + int64_t cost = taosGetTimestampUs() - pInfo->timestamp; + if (cost > PROCESS_THRESHOLD) { + dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum, + TMSG_INFO(pMsg->msgType), cost / (1000 * 1000)); + } + } + if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && terrno != 0) code = terrno; mmSendRsp(pMsg, code); diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c index 92ab5274e4..d0a86bdde7 100644 --- a/source/dnode/mnode/impl/src/mndArbGroup.c +++ b/source/dnode/mnode/impl/src/mndArbGroup.c @@ -27,6 +27,8 @@ #define ARBGROUP_VER_NUMBER 1 #define ARBGROUP_RESERVE_SIZE 64 +static SHashObj *arbUpdateHash = NULL; + static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup); static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew); static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup); @@ -74,10 +76,14 @@ int32_t mndInitArbGroup(SMnode *pMnode) { mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup); + arbUpdateHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); + return sdbSetTable(pMnode->pSdb, table); } -void mndCleanupArbGroup(SMnode *pMnode) {} +void mndCleanupArbGroup(SMnode *pMnode) { + taosHashCleanup(arbUpdateHash); +} SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) { SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId); @@ -221,8 +227,7 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64 " new version:%" PRId64, pOld->vgId, pOld, pNew, pOld->version, pNew->version); - taosThreadMutexUnlock(&pOld->mutex); - return 0; + goto _OVER; } for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { @@ -232,7 +237,11 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId; memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE); pOld->version++; + +_OVER: taosThreadMutexUnlock(&pOld->mutex); + + taosHashRemove(arbUpdateHash, &pOld->vgId, sizeof(int32_t)); return 0; } @@ -645,6 +654,11 @@ static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup) } static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) { + if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) { + mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId); + return 0; + } + int32_t contLen = 0; void *pHead = mndBuildArbUpdateGroupReq(&contLen, pNewGroup); if (!pHead) { @@ -653,7 +667,11 @@ static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) { } SRpcMsg rpcMsg = {.msgType = TDMT_MND_ARB_UPDATE_GROUP, .pCont = pHead, .contLen = contLen, .info.noResp = true}; - return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + int32_t ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + if (ret == 0) { + taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0); + } + return ret; } static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) { @@ -930,8 +948,12 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) { SVArbCheckSyncRsp syncRsp = {0}; if (tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp) != 0) { - terrno = TSDB_CODE_INVALID_MSG; mInfo("arb sync check failed, since:%s", tstrerror(pRsp->code)); + if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) { + terrno = TSDB_CODE_SUCCESS; + return 0; + } + terrno = TSDB_CODE_INVALID_MSG; return -1; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d225c2e272..3dc26b4118 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -847,7 +847,7 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) { if (pIter == NULL) break; maxChkptId = TMAX(maxChkptId, pStream->checkpointId); - mDebug("stream:%p, %s id:%" PRIx64 "checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid, + mDebug("stream:%p, %s id:0x%" PRIx64 " checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid, pStream->checkpointId); sdbRelease(pSdb, pStream); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3d5f5bd64c..0e6b85bd2b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1156,14 +1156,24 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) // check if the checkpoint msg already sent or not. if (status == TASK_STATUS__CK) { - tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64 - " transId:%d already received, ignore this msg and continue process checkpoint", + tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 + " transId:%d already handled, ignore msg and continue process checkpoint", pTask->id.idStr, pTask->chkInfo.checkpointingId, req.transId); taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; + } else { // checkpoint already finished, and not in checkpoint status + if (req.checkpointId <= pTask->chkInfo.checkpointId) { + tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 + " transId:%d already handled, ignore and discard", pTask->id.idStr, req.checkpointId, req.transId); + + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); + + return TSDB_CODE_SUCCESS; + } } streamProcessCheckpointSourceReq(pTask, &req); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index f690b9b277..25c2f32307 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -485,6 +485,7 @@ SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, in int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) { if (tSimpleHashGetSize(pSinkTableMap) > MAX_CACHE_TABLE_INFO_NUM) { + taosMemoryFreeClear(pTableSinkInfo); // too many items, failed to cache it return TSDB_CODE_FAILED; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 6195c37ab2..7741a1b096 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -897,6 +897,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { // commit code = commit_edit(fs); + ASSERT(code == 0); TSDB_CHECK_CODE(code, lino, _exit); // schedule merge @@ -973,11 +974,11 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { _exit: if (code) { - TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code)); } else { - tsdbDebug("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype); - tsem_post(&fs->canEdit); + tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype); } + tsem_post(&fs->canEdit); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c index 87f48acaac..971020e7d6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c @@ -580,9 +580,9 @@ int32_t tsdbMerge(void *arg) { } */ // do merge - tsdbDebug("vgId:%d merge begin, fid:%d", TD_VID(tsdb->pVnode), merger->fid); + tsdbInfo("vgId:%d merge begin, fid:%d", TD_VID(tsdb->pVnode), merger->fid); code = tsdbDoMerge(merger); - tsdbDebug("vgId:%d merge done, fid:%d", TD_VID(tsdb->pVnode), mergeArg->fid); + tsdbInfo("vgId:%d merge done, fid:%d", TD_VID(tsdb->pVnode), mergeArg->fid); TSDB_CHECK_CODE(code, lino, _exit); _exit: diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index b3ed86cff8..44fb0706b8 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -69,6 +69,7 @@ typedef struct { int64_t chkpId; char* dbPrefixPath; } SStreamTaskSnap; + struct STokenBucket { int32_t numCapacity; // total capacity, available token per second int32_t numOfToken; // total available tokens @@ -148,18 +149,19 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); void streamMetaRemoveDB(void* arg, char* key); -typedef enum UPLOAD_TYPE { - UPLOAD_DISABLE = -1, - UPLOAD_S3 = 0, - UPLOAD_RSYNC = 1, -} UPLOAD_TYPE; +typedef enum ECHECKPOINT_BACKUP_TYPE { + DATA_UPLOAD_DISABLE = -1, + DATA_UPLOAD_S3 = 0, + DATA_UPLOAD_RSYNC = 1, +} ECHECKPOINT_BACKUP_TYPE; -UPLOAD_TYPE getUploadType(); -int uploadCheckpoint(char* id, char* path); -int downloadCheckpoint(char* id, char* path); -int deleteCheckpoint(char* id); -int deleteCheckpointFile(char* id, char* name); -int downloadCheckpointByName(char* id, char* fname, char* dstName); +ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); + +int32_t streamTaskBackupCheckpoint(char* id, char* path); +int32_t downloadCheckpoint(char* id, char* path); +int32_t deleteCheckpoint(char* id); +int32_t deleteCheckpointFile(char* id, char* name); +int32_t downloadCheckpointByName(char* id, char* fname, char* dstName); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 0a2a4617fc..662d02a48f 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -376,10 +376,10 @@ int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char return code; } int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { - UPLOAD_TYPE type = getUploadType(); - if (type == UPLOAD_S3) { + ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); + if (type == DATA_UPLOAD_S3) { return rebuildFromRemoteChkp_s3(key, chkpPath, chkpId, defaultPath); - } else if (type == UPLOAD_RSYNC) { + } else if (type == DATA_UPLOAD_RSYNC) { return rebuildFromRemoteChkp_rsync(key, chkpPath, chkpId, defaultPath); } return -1; @@ -2111,11 +2111,11 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 } int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) { STaskDbWrapper* pDb = arg; - UPLOAD_TYPE utype = type; + ECHECKPOINT_BACKUP_TYPE utype = type; - if (utype == UPLOAD_RSYNC) { + if (utype == DATA_UPLOAD_RSYNC) { return taskDbGenChkpUploadData__rsync(pDb, chkpId, path); - } else if (utype == UPLOAD_S3) { + } else if (utype == DATA_UPLOAD_S3) { return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list); } return -1; diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 42bf334fa3..d164d01934 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -18,7 +18,7 @@ #include "streamBackendRocksdb.h" #include "streamInt.h" -#define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec +#define CHECK_NOT_RSP_DURATION 10 * 1000 // 10 sec static void processDownstreamReadyRsp(SStreamTask* pTask); static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId); @@ -26,16 +26,22 @@ static void rspMonitorFn(void* param, void* tmrId); static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs); static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id); +static int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id); static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p); -static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, - int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id); - +static void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList); +static void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList); +static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, + int64_t reqId, int32_t* pNotReady, const char* id); +static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId); +static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, + int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id); static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId); // check status void streamTaskCheckDownstream(SStreamTask* pTask) { SDataRange* pRange = &pTask->dataRange; STimeWindow* pWindow = &pRange->window; + const char* idstr = pTask->id.idStr; SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, @@ -51,16 +57,15 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { streamTaskStartMonitorCheckRsp(pTask); - req.reqId = tGenIdPI64(); - req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId; - req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; + STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; - streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); + setCheckDownstreamReqInfo(&req, tGenIdPI64(), pDispatch->taskId, pDispatch->nodeId); + streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pDispatch->taskId, pDispatch->nodeId, idstr); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64, - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, - pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); + idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, + pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); @@ -70,25 +75,23 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); - stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, - pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); + stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, idstr, + numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - req.reqId = tGenIdPI64(); - req.downstreamNodeId = pVgInfo->vgId; - req.downstreamTaskId = pVgInfo->taskId; - streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); + setCheckDownstreamReqInfo(&req, tGenIdPI64(), pVgInfo->taskId, pVgInfo->vgId); + streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pVgInfo->taskId, pVgInfo->vgId, idstr); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64, - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); + idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { // for sink task, set it ready directly. - stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); - streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); + stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId); + streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr); processDownstreamReadyRsp(pTask); } } @@ -158,31 +161,12 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return 0; } -int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) { - SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0}; - - taosThreadMutexLock(&pInfo->checkInfoLock); - - SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); - if (p != NULL) { - stDebug("s-task:%s check info to task:0x%x already sent", id, taskId); - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; - } - - taosArrayPush(pInfo->pList, &info); - - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; -} - int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { STaskCheckInfo* pInfo = &pTask->taskCheckInfo; taosThreadMutexLock(&pInfo->checkInfoLock); int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { - taosThreadMutexUnlock(&pInfo->checkInfoLock); return TSDB_CODE_FAILED; } @@ -307,10 +291,8 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); if (p != NULL) { - if (reqId != p->reqId) { - stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 - " expired check-rsp recv from downstream task:0x%x, discarded", + stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded", id, reqId, p->reqId, taskId); taosThreadMutexUnlock(&pInfo->checkInfoLock); return TSDB_CODE_FAILED; @@ -341,7 +323,8 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { pInfo->inCheckProcess = 1; } else { ASSERT(pInfo->startTs > 0); - stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs); + stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id, + pInfo->startTs); return TSDB_CODE_FAILED; } @@ -355,7 +338,7 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* } if (!pInfo->inCheckProcess) { - stWarn("s-task:%s already not in-check-procedure", id); +// stWarn("s-task:%s already not in-check-procedure", id); } int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0; @@ -378,6 +361,24 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* return 0; } +int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) { + SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0}; + + taosThreadMutexLock(&pInfo->checkInfoLock); + + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + stDebug("s-task:%s check info to task:0x%x already sent", id, taskId); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; + } + + taosArrayPush(pInfo->pList, &info); + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; +} + void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, @@ -389,9 +390,9 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { STaskOutputInfo* pOutputInfo = &pTask->outputInfo; if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { - req.reqId = p->reqId; - req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId; - req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId; + STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher; + setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->taskId); + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); @@ -404,12 +405,10 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); if (p->taskId == pVgInfo->taskId) { - req.reqId = p->reqId; - req.downstreamNodeId = pVgInfo->vgId; - req.downstreamTaskId = pVgInfo->taskId; + setCheckDownstreamReqInfo(&req, p->reqId, pVgInfo->taskId, pVgInfo->vgId); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 - " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64, + " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, p->reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); @@ -423,7 +422,6 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) { - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); if (p->status == TASK_DOWNSTREAM_READY) { @@ -447,6 +445,78 @@ void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, i } } +void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId) { + pReq->reqId = reqId; + pReq->downstreamTaskId = dstTaskId; + pReq->downstreamNodeId = dstNodeId; +} + +void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { + STaskCheckInfo* pInfo = &pTask->taskCheckInfo; + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; + int32_t numOfTimeout = taosArrayGetSize(pTimeoutList); + + ASSERT(pTask->status.downstreamReady == 0); + + for (int32_t i = 0; i < numOfTimeout; ++i) { + int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); + + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + ASSERT(p->status == -1 && p->rspTs == 0); + doSendCheckMsg(pTask, p); + } + } + + pInfo->timeoutRetryCount += 1; + + // timeout more than 100 sec, add into node update list + if (pInfo->timeoutRetryCount > 10) { + pInfo->timeoutRetryCount = 0; + + for (int32_t i = 0; i < numOfTimeout; ++i) { + int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + addIntoNodeUpdateList(pTask, p->vgId); + stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list", + id, vgId, p->taskId, p->vgId); + } + } + + stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout); + } else { + stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id, + vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs); + } +} + +void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { + STaskCheckInfo* pInfo = &pTask->taskCheckInfo; + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; + int32_t numOfNotReady = taosArrayGetSize(pNotReadyList); + + ASSERT(pTask->status.downstreamReady == 0); + + // reset the info, and send the check msg to failure downstream again + for (int32_t i = 0; i < numOfNotReady; ++i) { + int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i); + + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + p->rspTs = 0; + p->status = -1; + doSendCheckMsg(pTask, p); + } + } + + pInfo->notReadyRetryCount += 1; + stDebug("s-task:%s vgId:%d %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id, + vgId, numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs); +} + void rspMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = param; SStreamTaskState* pStat = streamTaskGetStatus(pTask); @@ -461,6 +531,7 @@ void rspMonitorFn(void* param, void* tmrId) { int32_t numOfNotRsp = 0; int32_t numOfNotReady = 0; int32_t numOfTimeout = 0; + int32_t total = taosArrayGetSize(pInfo->pList); stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id); @@ -510,7 +581,7 @@ void rspMonitorFn(void* param, void* tmrId) { numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); // fault tasks detected, not try anymore - ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList)); + ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == total); if (numOfFault > 0) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( @@ -550,57 +621,18 @@ void rspMonitorFn(void* param, void* tmrId) { } if (numOfNotReady > 0) { // check to make sure not in recheck timer - ASSERT(pTask->status.downstreamReady == 0); - - // reset the info, and send the check msg to failure downstream again - for (int32_t i = 0; i < numOfNotReady; ++i) { - int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i); - - SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); - if (p != NULL) { - p->rspTs = 0; - p->status = -1; - doSendCheckMsg(pTask, p); - } - } - - pInfo->notReadyRetryCount += 1; - stDebug("s-task:%s %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id, - numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs); + handleNotReadyDownstreamTask(pTask, pNotReadyList); } - // todo add into node update list and send to mnode if (numOfTimeout > 0) { - ASSERT(pTask->status.downstreamReady == 0); - - for (int32_t i = 0; i < numOfTimeout; ++i) { - int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); - - SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); - if (p != NULL) { - ASSERT(p->status == -1 && p->rspTs == 0); - doSendCheckMsg(pTask, p); - } - } - - pInfo->timeoutRetryCount += 1; - - // timeout more than 100 sec, add into node update list - if (pInfo->timeoutRetryCount > 10) { - pInfo->timeoutRetryCount = 0; - stDebug("s-task:%s vgId:%d %d downstream task(s) timeout more than 100sec, add into nodeUpate list", id, vgId, - numOfTimeout); - } else { - stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id, - vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs); - } + handleTimeoutDownstreamTasks(pTask, pTimeoutList); } taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); taosThreadMutexUnlock(&pInfo->checkInfoLock); - stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id, - numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); + stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", + id, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 8efd661d12..e6d7c2fde8 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -19,7 +19,7 @@ #include "streamInt.h" typedef struct { - UPLOAD_TYPE type; + ECHECKPOINT_BACKUP_TYPE type; char* taskId; int64_t chkpId; @@ -416,7 +416,7 @@ int32_t getChkpMeta(char* id, char* path, SArray* list) { return code; } -int32_t doUploadChkp(void* param) { +int32_t uploadCheckpointData(void* param) { SAsyncUploadArg* arg = param; char* path = NULL; int32_t code = 0; @@ -426,13 +426,13 @@ int32_t doUploadChkp(void* param) { (int8_t)(arg->type), &path, toDelFiles)) != 0) { stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); } - if (arg->type == UPLOAD_S3) { + if (arg->type == DATA_UPLOAD_S3) { if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) { stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId); } } - if (code == 0 && (code = uploadCheckpoint(arg->taskId, path)) != 0) { + if (code == 0 && (code = streamTaskBackupCheckpoint(arg->taskId, path)) != 0) { stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId); } @@ -459,8 +459,8 @@ int32_t doUploadChkp(void* param) { int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { // async upload - UPLOAD_TYPE type = getUploadType(); - if (type == UPLOAD_DISABLE) { + ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); + if (type == DATA_UPLOAD_DISABLE) { return 0; } @@ -474,7 +474,7 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { arg->chkpId = chkpId; arg->pTask = pTask; - return streamMetaAsyncExec(pTask->pMeta, doUploadChkp, arg, NULL); + return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL); } int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { @@ -558,7 +558,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { return code; } -static int uploadCheckpointToS3(char* id, char* path) { +static int32_t uploadCheckpointToS3(char* id, char* path) { TdDirPtr pDir = taosOpenDir(path); if (pDir == NULL) return -1; @@ -590,8 +590,8 @@ static int uploadCheckpointToS3(char* id, char* path) { return 0; } -static int downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { - int code = 0; +static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { + int32_t code = 0; char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); sprintf(buf, "%s/%s", id, fname); if (s3GetObjectToFile(buf, dstName) != 0) { @@ -601,19 +601,19 @@ static int downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { return code; } -UPLOAD_TYPE getUploadType() { +ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() { if (strlen(tsSnodeAddress) != 0) { - return UPLOAD_RSYNC; + return DATA_UPLOAD_RSYNC; } else if (tsS3StreamEnabled) { - return UPLOAD_S3; + return DATA_UPLOAD_S3; } else { - return UPLOAD_DISABLE; + return DATA_UPLOAD_DISABLE; } } -int uploadCheckpoint(char* id, char* path) { +int32_t streamTaskBackupCheckpoint(char* id, char* path) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { - stError("uploadCheckpoint parameters invalid"); + stError("streamTaskBackupCheckpoint parameters invalid"); return -1; } if (strlen(tsSnodeAddress) != 0) { @@ -625,7 +625,7 @@ int uploadCheckpoint(char* id, char* path) { } // fileName: CURRENT -int downloadCheckpointByName(char* id, char* fname, char* dstName) { +int32_t downloadCheckpointByName(char* id, char* fname, char* dstName) { if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) { stError("uploadCheckpointByName parameters invalid"); return -1; @@ -638,7 +638,7 @@ int downloadCheckpointByName(char* id, char* fname, char* dstName) { return 0; } -int downloadCheckpoint(char* id, char* path) { +int32_t downloadCheckpoint(char* id, char* path) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { stError("downloadCheckpoint parameters invalid"); return -1; @@ -651,7 +651,7 @@ int downloadCheckpoint(char* id, char* path) { return 0; } -int deleteCheckpoint(char* id) { +int32_t deleteCheckpoint(char* id) { if (id == NULL || strlen(id) == 0) { stError("deleteCheckpoint parameters invalid"); return -1; @@ -664,7 +664,7 @@ int deleteCheckpoint(char* id) { return 0; } -int deleteCheckpointFile(char* id, char* name) { +int32_t deleteCheckpointFile(char* id, char* name) { char object[128] = {0}; snprintf(object, sizeof(object), "%s/%s", id, name); char* tmp = object; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 9f79501471..9e872a1aff 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -17,7 +17,6 @@ #define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_SMOOTH_BURST_RATIO 5 // 5 sec -#define WAIT_FOR_DURATION 10 // todo refactor: // read data from input queue diff --git a/tests/system-test/0-others/compatibility.py b/tests/system-test/0-others/compatibility.py index 4c2fe652b8..71adf6eaac 100644 --- a/tests/system-test/0-others/compatibility.py +++ b/tests/system-test/0-others/compatibility.py @@ -185,8 +185,8 @@ class TDTestCase: # baseVersion = "3.0.1.8" tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{BASEVERSION}") - tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") - os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ") + tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -v 1 -y ") + os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -v 1 -y ") os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database test '") os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/com_alltypedata.json -y") os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database curdb '") @@ -196,49 +196,81 @@ class TDTestCase: os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'select min(ui) from curdb.meters '") os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'select max(bi) from curdb.meters '") - # os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ") - # os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ') - # os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ') + os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ") + os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream trigger at_once into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ') + os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ') + self.alter_string_in_file("0-others/tmqBasic.json", "/etc/taos/", cPath) # os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/tmqBasic.json -y ") - os.system('LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists tmq_test_topic as select current,voltage,phase from test.meters where voltage <= 106 and current <= 5;" ') + # create db/stb/select topic + + db_topic = "db_test_topic" + os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {db_topic} with meta as database test" ') + + stable_topic = "stable_test_meters_topic" + os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {stable_topic} as stable test.meters where tbname like \\"d3\\";" ') + + select_topic = "select_test_meters_topic" + os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists {select_topic} as select current,voltage,phase from test.meters where voltage >= 170;" ') + os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show topics;" ') os.system(f" /usr/bin/taosadapter --version " ) consumer_dict = { "group.id": "g1", + "td.connect.websocket.scheme": "ws", "td.connect.user": "root", "td.connect.pass": "taosdata", "auto.offset.reset": "earliest", + "enable.auto.commit": "false", } - consumer = taosws.Consumer(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) + consumer = taosws.Consumer(consumer_dict) try: - consumer.subscribe(["tmq_test_topic"]) + consumer.subscribe([select_topic]) except TmqError: tdLog.exit(f"subscribe error") - + first_consumer_rows = 0 while True: message = consumer.poll(timeout=1.0) if message: - print("message") - id = message.vgroup() - topic = message.topic() - database = message.database() - for block in message: - nrows = block.nrows() - ncols = block.ncols() - for row in block: - print(row) - values = block.fetchall() - print(nrows, ncols) - - consumer.commit(message) + first_consumer_rows += block.nrows() else: - print("break") + tdLog.notice("message is null and break") break + consumer.commit(message) + tdLog.debug(f"topic:{select_topic} ,first consumer rows is {first_consumer_rows} in old version") + break consumer.close() + # consumer_dict2 = { + # "group.id": "g2", + # "td.connect.websocket.scheme": "ws", + # "td.connect.user": "root", + # "td.connect.pass": "taosdata", + # "auto.offset.reset": "earliest", + # "enable.auto.commit": "false", + # } + # consumer = taosws.Consumer(consumer_dict2) + # try: + # consumer.subscribe([db_topic,stable_topic]) + # except TmqError: + # tdLog.exit(f"subscribe error") + # first_consumer_rows = 0 + # while True: + # message = consumer.poll(timeout=1.0) + # if message: + # for block in message: + # first_consumer_rows += block.nrows() + # else: + # tdLog.notice("message is null and break") + # break + # consumer.commit(message) + # tdLog.debug(f"topic:{select_topic} ,first consumer rows is {first_consumer_rows} in old version") + # break + + + tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ") os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y") os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '") @@ -279,11 +311,10 @@ class TDTestCase: tdLog.printNoPrefix(f"==========step3:prepare and check data in new version-{nowServerVersion}") tdsql.query(f"select count(*) from {stb}") tdsql.checkData(0,0,tableNumbers*recordNumbers1) - # tdsql.query("show streams;") - # os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ") - # tdsql.query("show streams;") - # tdsql.query(f"select count(*) from {stb}") - # tdsql.checkData(0,0,tableNumbers*recordNumbers2) + tdsql.query("show streams;") + tdsql.checkRows(2) + + # checkout db4096 tdsql.query("select count(*) from db4096.stb0") @@ -334,7 +365,7 @@ class TDTestCase: # check stream tdsql.query("show streams;") - tdsql.checkRows(0) + tdsql.checkRows(2) #check TS-3131 tdsql.query("select *,tbname from d0.almlog where mcid='m0103';") @@ -348,39 +379,48 @@ class TDTestCase: print("The unordered list is the same as the ordered list.") else: tdLog.exit("The unordered list is not the same as the ordered list.") - tdsql.execute("insert into test.d80 values (now+1s, 11, 103, 0.21);") - tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);") # check tmq + tdsql.execute("insert into test.d80 values (now+1s, 11, 190, 0.21);") + tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);") conn = taos.connect() consumer = Consumer( { - "group.id": "tg75", - "client.id": "124", + "group.id": "g1", "td.connect.user": "root", "td.connect.pass": "taosdata", "enable.auto.commit": "true", "experimental.snapshot.enable": "true", } ) - consumer.subscribe(["tmq_test_topic"]) - + consumer.subscribe([select_topic]) + consumer_rows = 0 while True: - res = consumer.poll(10) - if not res: + message = consumer.poll(timeout=1.0) + tdLog.info(f" null {message}") + if message: + for block in message: + consumer_rows += block.nrows() + tdLog.info(f"consumer rows is {consumer_rows}") + else: + print("consumer has completed and break") break - err = res.error() - if err is not None: - raise err - val = res.value() - - for block in val: - print(block.fetchall()) + consumer.close() + tdsql.query("select current,voltage,phase from test.meters where voltage >= 170;") + all_rows = tdsql.queryRows + if consumer_rows < all_rows - first_consumer_rows : + tdLog.exit(f"consumer rows is {consumer_rows}, less than {all_rows - first_consumer_rows}") tdsql.query("show topics;") - tdsql.checkRows(1) + tdsql.checkRows(3) + tdsql.execute(f"drop topic {select_topic};",queryTimes=10) + tdsql.execute(f"drop topic {db_topic};",queryTimes=10) + tdsql.execute(f"drop topic {stable_topic};",queryTimes=10) + os.system(f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ") + tdsql.query(f"select count(*) from {stb}") + tdsql.checkData(0,0,tableNumbers*recordNumbers2) def stop(self): tdSql.close() diff --git a/tests/system-test/7-tmq/tmq_primary_key.py b/tests/system-test/7-tmq/tmq_primary_key.py index 8f62dc8783..7d0d3da4fc 100644 --- a/tests/system-test/7-tmq/tmq_primary_key.py +++ b/tests/system-test/7-tmq/tmq_primary_key.py @@ -62,7 +62,7 @@ class TDTestCase: while True: res = consumer.poll(1) if not res: - break + continue val = res.value() if val is None: continue @@ -173,7 +173,7 @@ class TDTestCase: while True: res = consumer.poll(1) if not res: - break + continue val = res.value() if val is None: continue @@ -282,7 +282,7 @@ class TDTestCase: while True: res = consumer.poll(1) if not res: - break + continue val = res.value() if val is None: continue @@ -391,7 +391,7 @@ class TDTestCase: while True: res = consumer.poll(1) if not res: - break + continue val = res.value() if val is None: continue diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index 6c6144244f..5e9d82e71e 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -1866,6 +1866,29 @@ int sml_td29691_Test() { printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); ASSERT(code == TSDB_CODE_PAR_DUPLICATED_COLUMN); taos_free_result(pRes); + + //check column tag name duplication when update + const char *sql7[] = { + "vbin,t1=1,t2=2,f1=ewe f2=b\"hello\" 1632299372003", + }; + pRes = taos_schemaless_insert(taos, (char **)sql7, sizeof(sql7) / sizeof(sql7[0]), TSDB_SML_LINE_PROTOCOL, + TSDB_SML_TIMESTAMP_MILLI_SECONDS); + code = taos_errno(pRes); + printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); + ASSERT(code == TSDB_CODE_PAR_DUPLICATED_COLUMN); + taos_free_result(pRes); + + //check column tag name duplication when update + const char *sql6[] = { + "vbin,t1=1 t2=2,f1=1,f2=b\"hello\" 1632299372004", + }; + pRes = taos_schemaless_insert(taos, (char **)sql6, sizeof(sql6) / sizeof(sql6[0]), TSDB_SML_LINE_PROTOCOL, + TSDB_SML_TIMESTAMP_MILLI_SECONDS); + code = taos_errno(pRes); + printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); + ASSERT(code == TSDB_CODE_PAR_DUPLICATED_COLUMN); + taos_free_result(pRes); + taos_close(taos); return code;