diff --git a/docs/en/28-releases/01-tdengine.md b/docs/en/28-releases/01-tdengine.md index 83b0fe5ac4..d05bf1139c 100644 --- a/docs/en/28-releases/01-tdengine.md +++ b/docs/en/28-releases/01-tdengine.md @@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://w import Release from "/components/ReleaseV3"; +## 3.0.7.1 + + + ## 3.0.7.0 diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md index 67718d59bf..52bb9c87a0 100644 --- a/docs/zh/28-releases/01-tdengine.md +++ b/docs/zh/28-releases/01-tdengine.md @@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do import Release from "/components/ReleaseV3"; +## 3.0.7.1 + + + ## 3.0.7.0 diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 657435e5ff..d6c552b3f6 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -34,6 +34,7 @@ extern char tsFirst[]; extern char tsSecond[]; extern char tsLocalFqdn[]; extern char tsLocalEp[]; +extern char tsVersionName[]; extern uint16_t tsServerPort; extern int32_t tsVersion; extern int32_t tsStatusInterval; diff --git a/include/libs/tfs/tfs.h b/include/libs/tfs/tfs.h index 622cd615b8..509f8dc9e8 100644 --- a/include/libs/tfs/tfs.h +++ b/include/libs/tfs/tfs.h @@ -69,6 +69,13 @@ void tfsUpdateSize(STfs *pTfs); */ SDiskSize tfsGetSize(STfs *pTfs); +/** + * @brief Get the number of disks at level of multi-tier storage. + * + * @param pTfs + * @return int32_t + */ +int32_t tfsGetDisksAtLevel(STfs *pTfs, int32_t level); /** * @brief Get level of multi-tier storage. * @@ -123,6 +130,15 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname); */ int32_t tfsMkdirAt(STfs *pTfs, const char *rname, SDiskID diskId); +/** + * @brief Recursive make directory at all levels in tfs. + * + * @param pTfs The fs object. + * @param rname The rel name of directory. + * @return int32_t 0 for success, -1 for failure. + */ +int32_t tfsMkdirRecur(STfs *pTfs, const char *rname); + /** * @brief Recursive create directories in tfs. * @@ -160,7 +176,17 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname); * @param nrname The rel name of new file. * @return int32_t 0 for success, -1 for failure. */ -int32_t tfsRename(STfs *pTfs, const char *orname, const char *nrname); +int32_t tfsRename(STfs *pTfs, int32_t diskPrimary, const char *orname, const char *nrname); + +/** + * @brief Search fname in level of tfs + * + * @param pTfs The fs object. + * @param level The level to search on + * @param fname The relative file name to be searched + * @param int32_t diskId for successs, -1 for failure + */ +int32_t tfsSearch(STfs *pTfs, int32_t level, const char *fname); /** * @brief Init file object in tfs. diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index a8ccb67bfb..b5309178ae 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -36,7 +36,7 @@ typedef struct { bool taosCheckSystemIsLittleEnd(); void taosGetSystemInfo(); int32_t taosGetEmail(char *email, int32_t maxLen); -int32_t taosGetOsReleaseName(char *releaseName, int32_t maxLen); +int32_t taosGetOsReleaseName(char *releaseName, char* sName, char* ver, int32_t maxLen); int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores); int32_t taosGetCpuCores(float *numOfCores); void taosGetCpuUsage(double *cpu_system, double *cpu_engine); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 06fdfa67d5..ae105aa1dc 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -416,7 +416,7 @@ int32_t* taosGetErrno(); // #define TSDB_CODE_VND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0501) // 2.x // #define TSDB_CODE_VND_ACTION_NEED_REPROCESS. TAOS_DEF_ERROR_CODE(0, 0x0502) // 2.x #define TSDB_CODE_VND_INVALID_VGROUP_ID TAOS_DEF_ERROR_CODE(0, 0x0503) -// #define TSDB_CODE_VND_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0504) // 2.x +#define TSDB_CODE_VND_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0504) // #define TSDB_CODE_VND_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0505) // 2.x // #define TSDB_CODE_VND_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0506) // 2.x // #define TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR TAOS_DEF_ERROR_CODE(0, 0x0507) // 2.x diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index 9c6d941172..356ea2be1c 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -1,4 +1,8 @@ aux_source_directory(src COMMON_SRC) +IF (TD_ENTERPRISE) +LIST(APPEND COMMON_SRC ${TD_ENTERPRISE_DIR}/src/plugins/common/src/tglobal.c) +ENDIF() + add_library(common STATIC ${COMMON_SRC}) if (DEFINED GRANT_CFG_INCLUDE_DIR) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3545ece6d8..611a273353 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -34,6 +34,7 @@ char tsFirst[TSDB_EP_LEN] = {0}; char tsSecond[TSDB_EP_LEN] = {0}; char tsLocalFqdn[TSDB_FQDN_LEN] = {0}; char tsLocalEp[TSDB_EP_LEN] = {0}; // Local End Point, hostname:port +char tsVersionName[16] = "community"; uint16_t tsServerPort = 6030; int32_t tsVersion = 30000000; int32_t tsStatusInterval = 1; // second @@ -938,6 +939,12 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { return 0; } +#ifndef TD_ENTERPRISE +static int32_t taosSetReleaseCfg(SConfig *pCfg) { return 0; } +#else +int32_t taosSetReleaseCfg(SConfig *pCfg); +#endif + void taosLocalCfgForbiddenToChange(char *name, bool *forbidden) { int32_t len = strlen(name); char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0}; @@ -1444,6 +1451,7 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile if (taosSetClientCfg(tsCfg)) return -1; if (taosUpdateServerCfg(tsCfg)) return -1; if (taosSetServerCfg(tsCfg)) return -1; + if (taosSetReleaseCfg(tsCfg)) return -1; if (taosSetTfsCfg(tsCfg) != 0) return -1; } taosSetSystemCfg(tsCfg); @@ -1490,14 +1498,8 @@ void taosCfgDynamicOptions(const char *option, const char *value) { if (strcasecmp(option, "keepTimeOffset") == 0) { int32_t newKeepTimeOffset = atoi(value); - if (newKeepTimeOffset < 0 || newKeepTimeOffset > 23) { - uError("failed to set keepTimeOffset from %d to %d. Valid range: [0, 23]", tsKeepTimeOffset, newKeepTimeOffset); - return; - } - uInfo("keepTimeOffset set from %d to %d", tsKeepTimeOffset, newKeepTimeOffset); tsKeepTimeOffset = newKeepTimeOffset; - return; } diff --git a/source/common/src/tmisce.c b/source/common/src/tmisce.c index c195f5387c..95a5c27cf1 100644 --- a/source/common/src/tmisce.c +++ b/source/common/src/tmisce.c @@ -109,7 +109,7 @@ int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t taosGetAppName(tmp, NULL); tjsonAddStringToObject(pJson, "appName", tmp); - if (taosGetOsReleaseName(tmp, sizeof(tmp)) == 0) { + if (taosGetOsReleaseName(tmp, NULL, NULL, sizeof(tmp)) == 0) { tjsonAddStringToObject(pJson, "os", tmp); } diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index 01a9a245be..e1b8a57684 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -359,7 +359,11 @@ int mainWindows(int argc, char **argv) { taosCleanupArgs(); if (dmInit() != 0) { - dError("failed to init dnode since %s", terrstr()); + if (terrno == TSDB_CODE_NOT_FOUND) { + dError("failed to init dnode since unsupported platform, please visit https://www.taosdata.com for support"); + } else { + dError("failed to init dnode since %s", terrstr()); + } taosCleanupCfg(); taosCloseLog(); diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index d1dc872f4b..5d08320fab 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -46,6 +46,7 @@ typedef struct { int32_t vgId; int32_t vgVersion; int8_t dropped; + int32_t diskPrimary; int32_t toVgId; char path[PATH_MAX + 20]; } SWrapperCfg; @@ -56,6 +57,7 @@ typedef struct { int32_t refCount; int8_t dropped; int8_t disable; + int32_t diskPrimary; int32_t toVgId; char *path; SVnode *pImpl; @@ -81,6 +83,7 @@ typedef struct { } SVnodeThread; // vmInt.c +int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId); SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId); void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index 769b274f7f..da7f4d4a56 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -71,6 +71,8 @@ static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg ** if (code < 0) goto _OVER; tjsonGetInt32ValueFromDouble(vnode, "vgVersion", pCfg->vgVersion, code); if (code < 0) goto _OVER; + tjsonGetInt32ValueFromDouble(vnode, "diskPrimary", pCfg->diskPrimary, code); + if (code < 0) goto _OVER; tjsonGetInt32ValueFromDouble(vnode, "toVgId", pCfg->toVgId, code); if (code < 0) goto _OVER; @@ -167,6 +169,7 @@ static int32_t vmEncodeVnodeList(SJson *pJson, SVnodeObj **ppVnodes, int32_t num if (tjsonAddDoubleToObject(vnode, "vgId", pVnode->vgId) < 0) return -1; if (tjsonAddDoubleToObject(vnode, "dropped", pVnode->dropped) < 0) return -1; if (tjsonAddDoubleToObject(vnode, "vgVersion", pVnode->vgVersion) < 0) return -1; + if (tjsonAddDoubleToObject(vnode, "diskPrimary", pVnode->diskPrimary) < 0) return -1; if (pVnode->toVgId && tjsonAddDoubleToObject(vnode, "toVgId", pVnode->toVgId) < 0) return -1; if (tjsonAddItemToArray(vnodes, vnode) < 0) return -1; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 7e9f1f795f..d48bd3f847 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -263,16 +263,19 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } + wrapperCfg.diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId); + int32_t diskPrimary = wrapperCfg.diskPrimary; + snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId); - if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) { + if (vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs) < 0) { tFreeSCreateVnodeReq(&req); dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr()); code = terrno; goto _OVER; } - SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb); + SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr()); code = terrno; @@ -403,21 +406,23 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { .dropped = pVnode->dropped, .vgId = pVnode->vgId, .vgVersion = pVnode->vgVersion, + .diskPrimary = pVnode->diskPrimary, }; tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path)); vmCloseVnode(pMgmt, pVnode, false); + int32_t diskPrimary = wrapperCfg.diskPrimary; char path[TSDB_FILENAME_LEN] = {0}; snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId); dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path); - if (vnodeAlterReplica(path, &req, pMgmt->pTfs) < 0) { + if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) { dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr()); return -1; } dInfo("vgId:%d, begin to open vnode", vgId); - SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb); + SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr()); return -1; @@ -490,6 +495,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { .dropped = pVnode->dropped, .vgId = dstVgId, .vgVersion = pVnode->vgVersion, + .diskPrimary = pVnode->diskPrimary, }; tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path)); @@ -503,19 +509,20 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { dInfo("vgId:%d, close vnode", srcVgId); vmCloseVnode(pMgmt, pVnode, true); + int32_t diskPrimary = wrapperCfg.diskPrimary; char srcPath[TSDB_FILENAME_LEN] = {0}; char dstPath[TSDB_FILENAME_LEN] = {0}; snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId); snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId); dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath); - if (vnodeAlterHashRange(srcPath, dstPath, &req, pMgmt->pTfs) < 0) { + if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) { dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr()); return -1; } dInfo("vgId:%d, open vnode", dstVgId); - SVnode *pImpl = vnodeOpen(dstPath, pMgmt->pTfs, pMgmt->msgCb); + SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr()); return -1; @@ -602,21 +609,23 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { .dropped = pVnode->dropped, .vgId = pVnode->vgId, .vgVersion = pVnode->vgVersion, + .diskPrimary = pVnode->diskPrimary, }; tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path)); vmCloseVnode(pMgmt, pVnode, false); + int32_t diskPrimary = wrapperCfg.diskPrimary; char path[TSDB_FILENAME_LEN] = {0}; snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId); dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path); - if (vnodeAlterReplica(path, &alterReq, pMgmt->pTfs) < 0) { + if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) { dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr()); return -1; } dInfo("vgId:%d, begin to open vnode", vgId); - SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb); + SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index db46ce3ca0..94a753062c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -15,8 +15,64 @@ #define _DEFAULT_SOURCE #include "vmInt.h" +#include "tfs.h" #include "vnd.h" +int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { + STfs *pTfs = pMgmt->pTfs; + int32_t diskId = 0; + if (!pTfs) { + return diskId; + } + + // search fs + char vnodePath[TSDB_FILENAME_LEN] = {0}; + snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId); + char fname[TSDB_FILENAME_LEN] = {0}; + char fnameTmp[TSDB_FILENAME_LEN] = {0}; + snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME); + snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP); + + diskId = tfsSearch(pTfs, 0, fname); + if (diskId >= 0) { + return diskId; + } + diskId = tfsSearch(pTfs, 0, fnameTmp); + if (diskId >= 0) { + return diskId; + } + + // alloc + int32_t disks[TFS_MAX_DISKS_PER_TIER] = {0}; + int32_t numOfVnodes = 0; + SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); + for (int32_t v = 0; v < numOfVnodes; v++) { + SVnodeObj *pVnode = ppVnodes[v]; + disks[pVnode->diskPrimary] += 1; + } + + int32_t minVal = INT_MAX; + int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0); + diskId = 0; + for (int32_t id = 0; id < ndisk; id++) { + if (minVal > disks[id]) { + minVal = disks[id]; + diskId = id; + } + } + + for (int32_t i = 0; i < numOfVnodes; ++i) { + if (ppVnodes == NULL || ppVnodes[i] == NULL) continue; + vmReleaseVnode(pMgmt, ppVnodes[i]); + } + if (ppVnodes != NULL) { + taosMemoryFree(ppVnodes); + } + + dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes); + return diskId; +} + SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { SVnodeObj *pVnode = NULL; @@ -52,6 +108,7 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { pVnode->vgId = pCfg->vgId; pVnode->vgVersion = pCfg->vgVersion; + pVnode->diskPrimary = pCfg->diskPrimary; pVnode->refCount = 0; pVnode->dropped = 0; pVnode->path = taosStrdup(pCfg->path); @@ -169,7 +226,8 @@ static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) { snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId); snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId); - int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, pTfs); + int32_t diskPrimary = pCfg->diskPrimary; + int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs); if (vgId <= 0) { dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath); return -1; @@ -205,11 +263,12 @@ static void *vmOpenVnodeInThread(void *param) { pThread->updateVnodesList = true; } + int32_t diskPrimary = pCfg->diskPrimary; snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId); - SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb); + SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb); if (pImpl == NULL) { - dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); + dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr()); pThread->failed++; continue; } @@ -296,6 +355,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) { dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes); + terrno = TSDB_CODE_VND_INIT_FAILED; return -1; } @@ -518,7 +578,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { tmsgReportStartup("vnode-worker", "initialized"); if (vmOpenVnodes(pMgmt) != 0) { - dError("failed to open vnode since %s", terrstr()); + dError("failed to open all vnodes since %s", terrstr()); goto _OVER; } tmsgReportStartup("vnode-vnodes", "initialized"); diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 56bff0c760..848e123448 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -16,7 +16,33 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" -static SDnode globalDnode = {0}; +#define STR_CASE_CMP(s, d) (0 == strcasecmp((s), (d))) +#define STR_STR_CMP(s, d) (strstr((s), (d))) +#define STR_INT_CMP(s, d, c) (taosStr2Int32(s, 0, 10) c(d)) +#define STR_STR_SIGN ("ia") +#define DM_INIT_MON() \ + do { \ + code = (int32_t)(2147483648 | 298); \ + strncpy(stName, tsVersionName, 64); \ + monCfg.maxLogs = tsMonitorMaxLogs; \ + monCfg.port = tsMonitorPort; \ + monCfg.server = tsMonitorFqdn; \ + monCfg.comp = tsMonitorComp; \ + if (monInit(&monCfg) != 0) { \ + if (terrno != 0) code = terrno; \ + goto _exit; \ + } \ + } while (0) + +#define DM_ERR_RTN(c) \ + do { \ + code = (c); \ + goto _exit; \ + } while (0) + +static SDnode globalDnode = {0}; +static const char *dmOS[10] = {"Ubuntu", "CentOS Linux", "Red Hat", "Debian GNU", "CoreOS", + "FreeBSD", "openSUSE", "SLES", "Fedora", "MacOS"}; SDnode *dmInstance() { return &globalDnode; } @@ -37,16 +63,37 @@ static int32_t dmInitSystem() { } static int32_t dmInitMonitor() { + int32_t code = 0; SMonCfg monCfg = {0}; - monCfg.maxLogs = tsMonitorMaxLogs; - monCfg.port = tsMonitorPort; - monCfg.server = tsMonitorFqdn; - monCfg.comp = tsMonitorComp; - if (monInit(&monCfg) != 0) { - dError("failed to init monitor since %s", terrstr()); - return -1; + char reName[64] = {0}; + char stName[64] = {0}; + char ver[64] = {0}; + + DM_INIT_MON(); + + if (STR_STR_CMP(stName, STR_STR_SIGN)) { + DM_ERR_RTN(0); } - return 0; + if (taosGetOsReleaseName(reName, stName, ver, 64) != 0) { + DM_ERR_RTN(code); + } + if (STR_CASE_CMP(stName, dmOS[0])) { + if (STR_INT_CMP(ver, 17, >)) { + DM_ERR_RTN(0); + } + } else if (STR_CASE_CMP(stName, dmOS[1])) { + if (STR_INT_CMP(ver, 6, >)) { + DM_ERR_RTN(0); + } + } else if (STR_STR_CMP(stName, dmOS[2]) || STR_STR_CMP(stName, dmOS[3]) || STR_STR_CMP(stName, dmOS[4]) || + STR_STR_CMP(stName, dmOS[5]) || STR_STR_CMP(stName, dmOS[6]) || STR_STR_CMP(stName, dmOS[7]) || + STR_STR_CMP(stName, dmOS[8]) || STR_STR_CMP(stName, dmOS[9])) { + DM_ERR_RTN(0); + } + +_exit: + if (code) terrno = code; + return code; } static bool dmCheckDiskSpace() { diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 4d05637a2b..8ea98242f9 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -20,7 +20,6 @@ #define CLUSTER_VER_NUMBE 1 #define CLUSTER_RESERVE_SIZE 60 -char tsVersionName[16] = "community"; int64_t tsExpireTime = 0; static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index bb92bfb4c7..100513b932 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -70,6 +70,8 @@ static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter); static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter); +static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t opLen, int32_t *pOutValue); + int32_t mndInitDnode(SMnode *pMnode) { SSdbTable table = { .sdbType = SDB_DNODE, @@ -947,7 +949,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) { goto _OVER; } - mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", + mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port, dropReq.force?"true":"false", dropReq.unsafe?"true":"false"); if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) { goto _OVER; @@ -987,7 +989,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) { int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id); bool isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs()); - + if (isonline && force) { terrno = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE; mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(), @@ -1060,6 +1062,20 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { strcpy(dcfgReq.config, "monitor"); snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); + } else if (strncasecmp(cfgReq.config, "keeptimeoffset", 14) == 0) { + int32_t optLen = strlen("keeptimeoffset"); + int32_t flag = -1; + int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag); + if (code < 0) return code; + + if (flag < 0 || flag > 23) { + mError("dnode:%d, failed to config keepTimeOffset since value:%d. Valid range: [0, 23]", cfgReq.dnodeId, flag); + terrno = TSDB_CODE_INVALID_CFG; + return -1; + } + + strcpy(dcfgReq.config, "keeptimeoffset"); + snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); #ifdef TD_ENTERPRISE } else if (strncasecmp(cfgReq.config, "activeCode", 10) == 0 || strncasecmp(cfgReq.config, "cActiveCode", 11) == 0) { int8_t opt = strncasecmp(cfgReq.config, "a", 1) == 0 ? DND_ACTIVE_CODE : DND_CONN_ACTIVE_CODE; @@ -1292,3 +1308,28 @@ static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } + +// get int32_t value from 'SMCfgDnodeReq' +static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t opLen, int32_t *pOutValue) { + terrno = 0; + if (' ' != pMCfgReq->config[opLen] && 0 != pMCfgReq->config[opLen]) { + goto _err; + } + + if (' ' == pMCfgReq->config[opLen]) { + // 'key value' + if (strlen(pMCfgReq->value) != 0) goto _err; + *pOutValue = atoi(pMCfgReq->config + opLen + 1); + } else { + // 'key' 'value' + if (strlen(pMCfgReq->value) == 0) goto _err; + *pOutValue = atoi(pMCfgReq->value); + } + + return 0; + +_err: + mError("dnode:%d, failed to config keeptimeoffset since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config); + terrno = TSDB_CODE_INVALID_CFG; + return -1; +} diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 162e75d783..d6537ef992 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1738,6 +1738,7 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa SSchema *pSrcSchema = &pStb->pColumns[i]; memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); pSchema->type = pSrcSchema->type; + pSchema->flags = pSrcSchema->flags; pSchema->colId = pSrcSchema->colId; pSchema->bytes = pSrcSchema->bytes; } @@ -1788,6 +1789,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, SSchema *pSrcSchema = &pStb->pColumns[i]; memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); pSchema->type = pSrcSchema->type; + pSchema->flags = pSrcSchema->flags; pSchema->colId = pSrcSchema->colId; pSchema->bytes = pSrcSchema->bytes; } @@ -1797,6 +1799,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, SSchema *pSrcSchema = &pStb->pTags[i]; memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); pSchema->type = pSrcSchema->type; + pSchema->flags = pSrcSchema->flags; pSchema->colId = pSrcSchema->colId; pSchema->bytes = pSrcSchema->bytes; } diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c index 679fafa28d..ac379a9f94 100644 --- a/source/dnode/mnode/impl/src/mndTelem.c +++ b/source/dnode/mnode/impl/src/mndTelem.c @@ -94,7 +94,7 @@ static char* mndBuildTelemetryReport(SMnode* pMnode) { tjsonAddStringToObject(pJson, "instanceId", clusterName); tjsonAddDoubleToObject(pJson, "reportVersion", 1); - if (taosGetOsReleaseName(tmp, sizeof(tmp)) == 0) { + if (taosGetOsReleaseName(tmp, NULL, NULL, sizeof(tmp)) == 0) { tjsonAddStringToObject(pJson, "os", tmp); } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 5f9739ef91..f60cc2f406 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -51,12 +51,14 @@ extern const SVnodeCfg vnodeCfgDefault; int32_t vnodeInit(int32_t nthreads); void vnodeCleanup(); -int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); -int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs); -int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs); -int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs); +int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs); +int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t diskPrimary, STfs *pTfs); +int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, + int32_t diskPrimary, STfs *pTfs); +int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, + int32_t diskPrimary, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); -SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); +SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb); void vnodePreClose(SVnode *pVnode); void vnodePostClose(SVnode *pVnode); void vnodeSyncCheckTimeout(SVnode *pVnode); diff --git a/source/dnode/vnode/src/inc/metaTtl.h b/source/dnode/vnode/src/inc/metaTtl.h index a3d3ceab24..45faceb1ea 100644 --- a/source/dnode/vnode/src/inc/metaTtl.h +++ b/source/dnode/vnode/src/inc/metaTtl.h @@ -38,6 +38,8 @@ typedef struct STtlManger { SHashObj* pTtlCache; // key: tuid, value: {ttl, ctime} SHashObj* pDirtyUids; // dirty tuid TTB* pTtlIdx; // btree<{deleteTime, tuid}, ttl> + + char* logPrefix; } STtlManger; typedef struct { @@ -77,9 +79,10 @@ typedef struct { typedef struct { tb_uid_t uid; TXN* pTxn; + int64_t ttlDays; } STtlDelTtlCtx; -int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback); +int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback, const char* logPrefix); void ttlMgrClose(STtlManger* pTtlMgr); int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta); diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 4e284dc788..85ef384ea9 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -88,7 +88,7 @@ void vnodeBufPoolAddToFreeList(SVBufPool* pPool); int32_t vnodeBufPoolRecycle(SVBufPool* pPool); // vnodeOpen.c -int32_t vnodeGetPrimaryDir(const char* relPath, STfs* pTfs, char* buf, size_t bufLen); +int32_t vnodeGetPrimaryDir(const char* relPath, int32_t diskPrimary, STfs* pTfs, char* buf, size_t bufLen); // vnodeQuery.c int32_t vnodeQueryOpen(SVnode* pVnode); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 77d3966769..0c78b2f2ef 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -93,6 +93,7 @@ typedef struct SQueryNode SQueryNode; #define VNODE_BUFPOOL_SEGMENTS 3 #define VND_INFO_FNAME "vnode.json" +#define VND_INFO_FNAME_TMP "vnode_tmp.json" // vnd.h typedef int32_t (*_query_reseek_func_t)(void* pQHandle); @@ -385,6 +386,7 @@ struct SVnode { SVState state; SVStatis statis; STfs* pTfs; + int32_t diskPrimary; SMsgCb msgCb; // Buffer Pool diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index e2253e26db..517d9692c7 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -41,7 +41,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { *ppMeta = NULL; // create handle - vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, path, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN); offset = strlen(path); snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VNODE_META_DIR); @@ -128,7 +128,9 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { } // open pTtlMgr ("ttlv1.idx") - ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0); + char logPrefix[128] = {0}; + sprintf(logPrefix, "vgId:%d", TD_VID(pVnode)); + ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0, logPrefix); if (ret < 0) { metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 670ab2643b..ad96004098 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -974,7 +974,15 @@ static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) { } static int metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) { + if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0; + STtlDelTtlCtx ctx = {.uid = pME->uid, .pTxn = pMeta->txn}; + if (pME->type == TSDB_CHILD_TABLE) { + ctx.ttlDays = pME->ctbEntry.ttlDays; + } else { + ctx.ttlDays = pME->ntbEntry.ttlDays; + } + return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx); } @@ -1968,7 +1976,6 @@ static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) { if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0; STtlUpdTtlCtx ctx = {.uid = pME->uid}; - if (pME->type == TSDB_CHILD_TABLE) { ctx.ttlDays = pME->ctbEntry.ttlDays; ctx.changeTimeMs = pME->ctbEntry.btime; diff --git a/source/dnode/vnode/src/meta/metaTtl.c b/source/dnode/vnode/src/meta/metaTtl.c index c6cb826149..045a759fad 100644 --- a/source/dnode/vnode/src/meta/metaTtl.c +++ b/source/dnode/vnode/src/meta/metaTtl.c @@ -39,8 +39,8 @@ static int32_t ttlMgrULock(STtlManger *pTtlMgr); const char *ttlTbname = "ttl.idx"; const char *ttlV1Tbname = "ttlv1.idx"; -int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) { - int ret = TSDB_CODE_SUCCESS; +int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix) { + int ret = TSDB_CODE_SUCCESS; int64_t startNs = taosGetTimestampNs(); *ppTtlMgr = NULL; @@ -48,9 +48,17 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) { STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr)); if (pTtlMgr == NULL) return TSDB_CODE_OUT_OF_MEMORY; + char *logBuffer = (char *)tdbOsCalloc(1, strlen(logPrefix) + 1); + if (logBuffer == NULL) { + tdbOsFree(pTtlMgr); + return TSDB_CODE_OUT_OF_MEMORY; + } + strcpy(logBuffer, logPrefix); + pTtlMgr->logPrefix = logBuffer; + ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback); if (ret < 0) { - metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno)); + metaError("%s, failed to open %s since %s", pTtlMgr->logPrefix, ttlV1Tbname, tstrerror(terrno)); tdbOsFree(pTtlMgr); return ret; } @@ -62,14 +70,14 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) { ret = ttlMgrFillCache(pTtlMgr); if (ret < 0) { - metaError("failed to fill hash since %s", tstrerror(terrno)); + metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno)); ttlMgrCleanup(pTtlMgr); return ret; } int64_t endNs = taosGetTimestampNs(); - metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache), - endNs - startNs); + metaInfo("%s, ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix, + taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs); *ppTtlMgr = pTtlMgr; return TSDB_CODE_SUCCESS; @@ -91,37 +99,37 @@ int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) { if (!tdbTbExist(ttlTbname, meta->pEnv)) return TSDB_CODE_SUCCESS; - metaInfo("ttl mgr start upgrade"); + metaInfo("%s, ttl mgr start upgrade", pTtlMgr->logPrefix); int64_t startNs = taosGetTimestampNs(); ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0); if (ret < 0) { - metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno)); + metaError("%s, failed to open %s index since %s", pTtlMgr->logPrefix, ttlTbname, tstrerror(terrno)); goto _out; } ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta); if (ret < 0) { - metaError("failed to convert ttl index since %s", tstrerror(terrno)); + metaError("%s, failed to convert ttl index since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn); if (ret < 0) { - metaError("failed to drop old ttl index since %s", tstrerror(terrno)); + metaError("%s, failed to drop old ttl index since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } ret = ttlMgrFillCache(pTtlMgr); if (ret < 0) { - metaError("failed to fill hash since %s", tstrerror(terrno)); + metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } int64_t endNs = taosGetTimestampNs(); - metaInfo("ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache), - endNs - startNs); + metaInfo("%s, ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix, + taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs); _out: tdbTbClose(pTtlMgr->pOldTtlIdx); pTtlMgr->pOldTtlIdx = NULL; @@ -130,11 +138,12 @@ _out: } static void ttlMgrCleanup(STtlManger *pTtlMgr) { + taosMemoryFree(pTtlMgr->logPrefix); taosHashCleanup(pTtlMgr->pTtlCache); taosHashCleanup(pTtlMgr->pDirtyUids); tdbTbClose(pTtlMgr->pTtlIdx); taosThreadRwlockDestroy(&pTtlMgr->lock); - tdbOsFree(pTtlMgr); + taosMemoryFree(pTtlMgr); } static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) { @@ -250,13 +259,13 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) { int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry)); if (ret < 0) { - metaError("ttlMgr insert failed to update ttl cache since %s", tstrerror(terrno)); + metaError("%s, ttlMgr insert failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } ret = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry)); if (ret < 0) { - metaError("ttlMgr insert failed to update ttl dirty uids since %s", tstrerror(terrno)); + metaError("%s, ttlMgr insert failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } @@ -264,20 +273,21 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) { _out: ttlMgrULock(pTtlMgr); - metaDebug("ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, updCtx->uid, - updCtx->changeTimeMs, updCtx->ttlDays); + metaDebug("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix, + updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays); return ret; } int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) { + if (delCtx->ttlDays == 0) return 0; ttlMgrWLock(pTtlMgr); STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DEL}; int ret = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry)); if (ret < 0) { - metaError("ttlMgr del failed to update ttl dirty uids since %s", tstrerror(terrno)); + metaError("%s, ttlMgr del failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } @@ -285,7 +295,7 @@ int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) { _out: ttlMgrULock(pTtlMgr); - metaDebug("ttl mgr delete ttl, uid: %" PRId64, delCtx->uid); + metaDebug("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid); return ret; } @@ -293,6 +303,8 @@ _out: int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) { ttlMgrWLock(pTtlMgr); + int ret = 0; + STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid)); if (oldData == NULL) { goto _out; @@ -301,17 +313,17 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays, .changeTimeMs = pUpdCtimeCtx->changeTimeMs}; STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT}; - int ret = - taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry)); + ret = taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry)); if (ret < 0) { - metaError("ttlMgr update ctime failed to update ttl cache since %s", tstrerror(terrno)); + metaError("%s, ttlMgr update ctime failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } ret = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry, sizeof(dirtryEntry)); if (ret < 0) { - metaError("ttlMgr update ctime failed to update ttl dirty uids since %s", tstrerror(terrno)); + metaError("%s, ttlMgr update ctime failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, + tstrerror(terrno)); goto _out; } @@ -319,7 +331,8 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime _out: ttlMgrULock(pTtlMgr); - metaDebug("ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pUpdCtimeCtx->uid, pUpdCtimeCtx->changeTimeMs); + metaDebug("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid, + pUpdCtimeCtx->changeTimeMs); return ret; } @@ -366,7 +379,7 @@ _out: int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) { ttlMgrWLock(pTtlMgr); - metaInfo("ttl mgr flush start."); + metaInfo("%s, ttl mgr flush start. dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids)); int ret = -1; @@ -377,9 +390,9 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) { STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid)); if (cacheEntry == NULL) { - metaError("ttlMgr flush failed to get ttl cache since %s, uid: %" PRId64 ", type: %d", tstrerror(terrno), *pUid, - pEntry->type); - goto _out; + metaError("%s, ttlMgr flush failed to get ttl cache since %s, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix, + tstrerror(terrno), *pUid, pEntry->type); + continue; } STtlIdxKeyV1 ttlKey; @@ -389,27 +402,29 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) { ret = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), &cacheEntry->ttlDays, sizeof(cacheEntry->ttlDays), pTxn); if (ret < 0) { - metaError("ttlMgr flush failed to flush ttl cache upsert since %s", tstrerror(terrno)); + metaError("%s, ttlMgr flush failed to flush ttl cache upsert since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } } else if (pEntry->type == ENTRY_TYPE_DEL) { ret = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn); if (ret < 0) { - metaError("ttlMgr flush failed to flush ttl cache del since %s", tstrerror(terrno)); + metaError("%s, ttlMgr flush failed to flush ttl cache del since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } ret = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid)); if (ret < 0) { - metaError("ttlMgr flush failed to delete ttl cache since %s", tstrerror(terrno)); + metaError("%s, ttlMgr flush failed to delete ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } } else { - metaError("ttlMgr flush failed to flush ttl cache, unknown type: %d", pEntry->type); + metaError("%s, ttlMgr flush failed to flush ttl cache, unknown type: %d", pTtlMgr->logPrefix, pEntry->type); goto _out; } - pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIter); + void *pIterTmp = pIter; + pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIterTmp); + taosHashRemove(pTtlMgr->pDirtyUids, pUid, sizeof(tb_uid_t)); } taosHashClear(pTtlMgr->pDirtyUids); @@ -418,7 +433,7 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) { _out: ttlMgrULock(pTtlMgr); - metaInfo("ttl mgr flush end."); + metaInfo("%s, ttl mgr flush end.", pTtlMgr->logPrefix); return ret; } @@ -426,7 +441,7 @@ _out: static int32_t ttlMgrRLock(STtlManger *pTtlMgr) { int32_t ret = 0; - metaTrace("ttlMgr rlock %p", &pTtlMgr->lock); + metaTrace("%s, ttlMgr rlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock); ret = taosThreadRwlockRdlock(&pTtlMgr->lock); @@ -436,7 +451,7 @@ static int32_t ttlMgrRLock(STtlManger *pTtlMgr) { static int32_t ttlMgrWLock(STtlManger *pTtlMgr) { int32_t ret = 0; - metaTrace("ttlMgr wlock %p", &pTtlMgr->lock); + metaTrace("%s, ttlMgr wlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock); ret = taosThreadRwlockWrlock(&pTtlMgr->lock); @@ -446,7 +461,7 @@ static int32_t ttlMgrWLock(STtlManger *pTtlMgr) { static int32_t ttlMgrULock(STtlManger *pTtlMgr) { int32_t ret = 0; - metaTrace("ttlMgr ulock %p", &pTtlMgr->lock); + metaTrace("%s, ttlMgr ulock %p", pTtlMgr->logPrefix, &pTtlMgr->lock); ret = taosThreadRwlockUnlock(&pTtlMgr->lock); diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index bd06e21053..e45cbac329 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -26,7 +26,7 @@ void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputN int32_t offset = 0; // vnode - vnodeGetPrimaryDir(pVnode->path, pTfs, outputName, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pTfs, outputName, TSDB_FILENAME_LEN); offset = strlen(outputName); // rsma diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ec9a796cfd..bbdd98e356 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1137,14 +1137,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // wait for the stream task get ready for scan history data while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) || pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - tqDebug("s-task:%s level:%d related stream task:%s not ready for halt, wait for it and recheck in 100ms", pId, - pTask->info.taskLevel, pId); + tqDebug( + "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms", + pId, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus)); taosMsleep(100); } // now we can stop the stream task execution pStreamTask->status.taskStatus = TASK_STATUS__HALT; - tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pId, + tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel, pId); // if it's an source task, extract the last version in wal. diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index cbb8e46b65..483f9bc4d6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -61,7 +61,7 @@ typedef struct { static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { SVnode *pVnode = pTsdb->pVnode; - vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, path, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN); int32_t offset = strlen(path); snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%scache.rdb", TD_DIRSEP); @@ -1928,6 +1928,10 @@ static void clearLastFileSet(SFSNextRowIter *state) { tBlockDataDestroy(state->pBlockData); state->pBlockData = NULL; } + + if (state->pFileReader) { + tsdbDataFileReaderClose(&state->pFileReader); + } } static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, @@ -2004,7 +2008,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie goto _err; } - for (int i = TARRAY2_SIZE(pBlkArray); i >= 0; --i) { + for (int i = TARRAY2_SIZE(pBlkArray) - 1; i >= 0; --i) { SBrinBlk *pBrinBlk = &pBlkArray->data[i]; if (state->suid >= pBrinBlk->minTbid.suid && state->suid <= pBrinBlk->maxTbid.suid) { if (state->uid >= pBrinBlk->minTbid.uid && state->uid <= pBrinBlk->maxTbid.uid) { @@ -2019,11 +2023,15 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie int indexSize = TARRAY_SIZE(state->pIndexList); if (indexSize <= 0) { // goto next fileset + clearLastFileSet(state); + goto _next_fileset; } state->state = SFSNEXTROW_INDEXLIST; state->iBrinIndex = indexSize; } else { // empty fileset, goto next fileset + // clearLastFileSet(state); + goto _next_fileset; } } @@ -2031,6 +2039,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie SBrinBlk *pBrinBlk = NULL; _next_brinindex: if (--state->iBrinIndex < 0) { // no index left, goto next fileset + clearLastFileSet(state); + goto _next_fileset; } else { pBrinBlk = taosArrayGet(state->pIndexList, state->iBrinIndex); } @@ -2159,28 +2169,9 @@ int32_t clearNextRowFromFS(void *iter) { if (!state) { return code; } - /* - if (state->pDataFReader) { - tsdbDataFReaderClose(&state->pDataFReader); - state->pDataFReader = NULL; - } - if (state->aBlockIdx) { - // taosArrayDestroy(state->aBlockIdx); - tsdbBICacheRelease(state->pTsdb->biCache, state->aBlockIdxHandle); - state->aBlockIdxHandle = NULL; - state->aBlockIdx = NULL; - } - if (state->pBlockData) { - // tBlockDataDestroy(&state->blockData, 1); - tBlockDataDestroy(state->pBlockData); - state->pBlockData = NULL; - } + clearLastFileSet(state); - if (state->blockMap.pData != NULL) { - tMapDataClear(&state->blockMap); - } -*/ return code; } @@ -2634,44 +2625,9 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->pTsdb = pTsdb; pIter->pMemDelData = NULL; + loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer); -#if 0 - pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); - SDelFile *pDelFile = pReadSnap->fs.pDelFile; - if (pDelFile) { - SDelFReader *pDelFReader; - - code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb); - if (code) goto _err; - - SArray *pDelIdxArray = taosArrayInit(32, sizeof(SDelIdx)); - - code = tsdbReadDelIdx(pDelFReader, pDelIdxArray); - if (code) { - taosArrayDestroy(pDelIdxArray); - tsdbDelFReaderClose(&pDelFReader); - goto _err; - } - - SDelIdx *delIdx = taosArraySearch(pDelIdxArray, &(SDelIdx){.suid = suid, .uid = uid}, tCmprDelIdx, TD_EQ); - - code = getTableDelSkyline(pMem, pIMem, pDelFReader, delIdx, pIter->pSkyline); - if (code) { - taosArrayDestroy(pDelIdxArray); - tsdbDelFReaderClose(&pDelFReader); - goto _err; - } - - taosArrayDestroy(pDelIdxArray); - tsdbDelFReaderClose(&pDelFReader); - } else { - code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pIter->pSkyline); - if (code) goto _err; - } - - pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1; -#endif pIter->idx = (SBlockIdx){.suid = suid, .uid = uid}; pIter->fsState.pRowIter = pIter; @@ -2683,13 +2639,10 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->fsState.suid = suid; pIter->fsState.uid = uid; pIter->fsState.lastTs = lastTs; + pIter->fsState.pr = pr; pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL}; pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL}; - /* -pIter->input[2] = (TsdbNextRowState){ - &pIter->fsLastRow, false, true, false, &pIter->fsLastState, getNextRowFromFSLast, clearNextRowFromFSLast}; - */ pIter->input[2] = (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS}; @@ -2717,7 +2670,7 @@ _err: static int32_t nextRowIterClose(CacheNextRowIter *pIter) { int code = 0; - for (int i = 0; i < 4; ++i) { + for (int i = 0; i < 3; ++i) { if (pIter->input[i].nextRowClearFn) { pIter->input[i].nextRowClearFn(pIter->input[i].iter); } @@ -2753,10 +2706,10 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI } } - if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop && pIter->input[3].stop) { + if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop) { *ppRow = NULL; - *pIgnoreEarlierTs = (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs || - pIter->input[2].ignoreEarlierTs || pIter->input[3].ignoreEarlierTs); + *pIgnoreEarlierTs = + (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs || pIter->input[2].ignoreEarlierTs); return code; } @@ -2766,7 +2719,7 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI int nMax = 0; TSKEY maxKey = TSKEY_MIN; - for (int i = 0; i < 4; ++i) { + for (int i = 0; i < 3; ++i) { if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) { TSDBKEY key = TSDBROW_KEY(pIter->input[i].pRow); @@ -2798,9 +2751,11 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI uint64_t uid = pIter->idx.uid; STableLoadInfo *pInfo = *(STableLoadInfo **)tSimpleHashGet(pIter->pr->pTableMap, &uid, sizeof(uid)); SArray *pTombData = pInfo->pTombData; - taosArrayAddAll(pTombData, pIter->pMemDelData); + if (pTombData) { + taosArrayAddAll(pTombData, pIter->pMemDelData); - code = tsdbBuildDeleteSkyline(pTombData, 0, (int32_t)(TARRAY_SIZE(pTombData) - 1), pIter->pSkyline); + code = tsdbBuildDeleteSkyline(pTombData, 0, (int32_t)(TARRAY_SIZE(pTombData) - 1), pIter->pSkyline); + } pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 87983030c0..a74c38211d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -185,7 +185,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, for (int32_t i = 0; i < numOfTables; ++i) { uint64_t uid = p->pTableList[i].uid; p->uidList[i] = uid; - STableLoadInfo* pInfo = taosMemoryMalloc(sizeof(STableLoadInfo)); + STableLoadInfo* pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo)); tSimpleHashPut(p->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES); } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 0971d9d274..ec116c717e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -276,14 +276,14 @@ void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) { // CURRENT if (current) { - vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, current, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, current, TSDB_FILENAME_LEN); offset = strlen(current); snprintf(current + offset, TSDB_FILENAME_LEN - offset - 1, "%sCURRENT", TD_DIRSEP); } // CURRENT.t if (current_t) { - vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, current_t, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, current_t, TSDB_FILENAME_LEN); offset = strlen(current_t); snprintf(current_t + offset, TSDB_FILENAME_LEN - offset - 1, "%sCURRENT.t", TD_DIRSEP); } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index ff38d4cc00..1a0104945f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -72,7 +72,7 @@ static int32_t destroy_fs(STFileSystem **fs) { int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) { int32_t offset = 0; - vnodeGetPrimaryDir(pTsdb->path, pTsdb->pVnode->pTfs, fname, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(pTsdb->path, pTsdb->pVnode->diskPrimary, pTsdb->pVnode->pTfs, fname, TSDB_FILENAME_LEN); offset = strlen(fname); snprintf(fname + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, gCurrentFname[ftype]); diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index 8577a42417..9ff4b28779 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -284,8 +284,9 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) { // SDelFile =============================================== void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) { int32_t offset = 0; + SVnode *pVnode = pTsdb->pVnode; - vnodeGetPrimaryDir(pTsdb->path, pTsdb->pVnode->pTfs, fname, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, fname, TSDB_FILENAME_LEN); offset = strlen(fname); snprintf((char *)fname + offset, TSDB_FILENAME_LEN - offset - 1, "%sv%dver%" PRId64 ".del", TD_DIRSEP, TD_VID(pTsdb->pVnode), pFile->commitID); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index e89f0996a9..33c6f9d533 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -295,7 +295,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { pInfo->txn = metaGetTxn(pVnode->pMeta); // save info - vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN); vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode)); if (vnodeSaveInfo(dir, &pInfo->info) < 0) { @@ -433,7 +433,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { return -1; } - vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN); syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed); @@ -496,7 +496,7 @@ bool vnodeShouldRollback(SVnode *pVnode) { char tFName[TSDB_FILENAME_LEN] = {0}; int32_t offset = 0; - vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, tFName, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, tFName, TSDB_FILENAME_LEN); offset = strlen(tFName); snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP); @@ -507,7 +507,7 @@ void vnodeRollback(SVnode *pVnode) { char tFName[TSDB_FILENAME_LEN] = {0}; int32_t offset = 0; - vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, tFName, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, tFName, TSDB_FILENAME_LEN); offset = strlen(tFName); snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 541c695ba0..3b07ef9ea9 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -15,9 +15,11 @@ #include "vnd.h" -int32_t vnodeGetPrimaryDir(const char *relPath, STfs *pTfs, char *buf, size_t bufLen) { +int32_t vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) { if (pTfs) { - snprintf(buf, bufLen - 1, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, relPath); + SDiskID diskId = {0}; + diskId.id = diskPrimary; + snprintf(buf, bufLen - 1, "%s%s%s", tfsGetDiskPath(pTfs, diskId), TD_DIRSEP, relPath); } else { snprintf(buf, bufLen - 1, "%s", relPath); } @@ -25,7 +27,15 @@ int32_t vnodeGetPrimaryDir(const char *relPath, STfs *pTfs, char *buf, size_t bu return 0; } -int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { +static int32_t vnodeMkDir(STfs *pTfs, const char *path) { + if (pTfs) { + return tfsMkdirRecur(pTfs, path); + } else { + return taosMkDir(path); + } +} + +int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs) { SVnodeInfo info = {0}; char dir[TSDB_FILENAME_LEN] = {0}; @@ -36,10 +46,11 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { } // create vnode env - vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN); - if (taosMkDir(dir)) { + if (vnodeMkDir(pTfs, path)) { + vError("vgId:%d, failed to prepare vnode dir since %s, path: %s", pCfg->vgId, strerror(errno), path); return TAOS_SYSTEM_ERROR(errno); } + vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN); if (pCfg) { info.config = *pCfg; @@ -60,12 +71,12 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { return 0; } -int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) { +int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t diskPrimary, STfs *pTfs) { SVnodeInfo info = {0}; char dir[TSDB_FILENAME_LEN] = {0}; int32_t ret = 0; - vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN); ret = vnodeLoadInfo(dir, &info); if (ret < 0) { @@ -133,7 +144,8 @@ static int32_t vnodeVgroupIdLen(int32_t vgId) { return strlen(tmp); } -int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) { +int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, + int32_t diskPrimary, STfs *pTfs) { int32_t ret = 0; char oldRname[TSDB_FILENAME_LEN] = {0}; @@ -164,7 +176,7 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr snprintf(newRname, TSDB_FILENAME_LEN, "%s%d%s", oldRname, dstVgId, tsdbFileSurfixPos); vInfo("vgId:%d, rename file from %s to %s", dstVgId, tsdbFile->rname, newRname); - ret = tfsRename(pTfs, tsdbFile->rname, newRname); + ret = tfsRename(pTfs, diskPrimary, tsdbFile->rname, newRname); if (ret != 0) { vError("vgId:%d, failed to rename file from %s to %s since %s", dstVgId, tsdbFile->rname, newRname, terrstr()); tfsClosedir(tsdbDir); @@ -176,19 +188,20 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr tfsClosedir(tsdbDir); vInfo("vgId:%d, rename dir from %s to %s", dstVgId, srcPath, dstPath); - ret = tfsRename(pTfs, srcPath, dstPath); + ret = tfsRename(pTfs, diskPrimary, srcPath, dstPath); if (ret != 0) { vError("vgId:%d, failed to rename dir from %s to %s since %s", dstVgId, srcPath, dstPath, terrstr()); } return ret; } -int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs) { +int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, + int32_t diskPrimary, STfs *pTfs) { SVnodeInfo info = {0}; char dir[TSDB_FILENAME_LEN] = {0}; int32_t ret = 0; - vnodeGetPrimaryDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN); ret = vnodeLoadInfo(dir, &info); if (ret < 0) { @@ -232,7 +245,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod } vInfo("vgId:%d, rename %s to %s", pReq->dstVgId, srcPath, dstPath); - ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, pTfs); + ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, diskPrimary, pTfs); if (ret < 0) { vError("vgId:%d, failed to rename vnode from %s to %s since %s", pReq->dstVgId, srcPath, dstPath, tstrerror(terrno)); @@ -243,11 +256,12 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod return 0; } -int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) { +int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, + int32_t diskPrimary, STfs *pTfs) { SVnodeInfo info = {0}; char dir[TSDB_FILENAME_LEN] = {0}; - vnodeGetPrimaryDir(dstPath, pTfs, dir, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(dstPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN); if (vnodeLoadInfo(dir, &info) == 0) { if (info.config.vgId != dstVgId) { vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId); @@ -256,7 +270,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s return dstVgId; } - vnodeGetPrimaryDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN); if (vnodeLoadInfo(dir, &info) < 0) { vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno)); return -1; @@ -271,7 +285,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s } vInfo("vgId:%d, rename %s to %s", dstVgId, srcPath, dstPath); - if (vnodeRenameVgroupId(srcPath, dstPath, srcVgId, dstVgId, pTfs) < 0) { + if (vnodeRenameVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs) < 0) { vError("vgId:%d, failed to rename vnode from %s to %s since %s", dstVgId, srcPath, dstPath, tstrerror(terrno)); return -1; } @@ -284,14 +298,31 @@ void vnodeDestroy(const char *path, STfs *pTfs) { tfsRmdir(pTfs, path); } -SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { +static int32_t vnodeCheckDisk(int32_t diskPrimary, STfs *pTfs) { + int32_t ndisk = 1; + if (pTfs) { + ndisk = tfsGetDisksAtLevel(pTfs, 0); + } + if (diskPrimary < 0 || diskPrimary >= ndisk) { + vError("disk:%d is unavailable from the %d disks mounted at level 0", diskPrimary, ndisk); + terrno = TSDB_CODE_FS_INVLD_CFG; + return -1; + } + return 0; +} + +SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb) { SVnode *pVnode = NULL; SVnodeInfo info = {0}; char dir[TSDB_FILENAME_LEN] = {0}; char tdir[TSDB_FILENAME_LEN * 2] = {0}; int32_t ret = 0; - vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN); + if (vnodeCheckDisk(diskPrimary, pTfs)) { + vError("failed to open vnode from %s since %s. diskPrimary:%d", path, terrstr(), diskPrimary); + return NULL; + } + vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN); info.config = vnodeCfgDefault; @@ -334,6 +365,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->state.applied = info.state.committed; pVnode->state.applyTerm = info.state.commitTerm; pVnode->pTfs = pTfs; + pVnode->diskPrimary = diskPrimary; pVnode->msgCb = msgCb; taosThreadMutexInit(&pVnode->lock, NULL); pVnode->blocked = false; diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index c80792490a..d559783c2f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -86,6 +86,7 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) { int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) { int32_t code = 0; + SVnode *pVnode = pReader->pVnode; // CONFIG ============== // FIXME: if commit multiple times and the config changed? @@ -93,7 +94,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) char fName[TSDB_FILENAME_LEN]; int32_t offset = 0; - vnodeGetPrimaryDir(pReader->pVnode->path, pReader->pVnode->pTfs, fName, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, fName, TSDB_FILENAME_LEN); offset = strlen(fName); snprintf(fName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME); @@ -343,7 +344,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * .applyTerm = pWriter->info.state.commitTerm}; pVnode->statis = pWriter->info.statis; char dir[TSDB_FILENAME_LEN] = {0}; - vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN); vnodeCommitInfo(dir); } else { @@ -381,7 +382,7 @@ _exit: static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { int32_t code = 0; - + SVnode *pVnode = pWriter->pVnode; SSnapDataHdr *pHdr = (SSnapDataHdr *)pData; // decode info @@ -395,10 +396,9 @@ static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_ // modify info as needed char dir[TSDB_FILENAME_LEN] = {0}; - vnodeGetPrimaryDir(pWriter->pVnode->path, pWriter->pVnode->pTfs, dir, TSDB_FILENAME_LEN); + vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN); SVnodeStats vndStats = pWriter->info.config.vndStats; - SVnode *pVnode = pWriter->pVnode; pWriter->info.config = pVnode->config; pWriter->info.config.vndStats = vndStats; vDebug("vgId:%d, save config while write snapshot", pWriter->pVnode->config.vgId); diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index ecda1d596a..54352d0a53 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -615,6 +615,31 @@ void appendTableOptions(char* buf, int32_t* len, SDbCfgInfo* pDbCfg, STableCfg* if (pCfg->ttl > 0) { *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " TTL %d", pCfg->ttl); } + + if (TSDB_SUPER_TABLE == pCfg->tableType || TSDB_NORMAL_TABLE == pCfg->tableType) { + int32_t nSma = 0; + for (int32_t i = 0; i < pCfg->numOfColumns; ++i) { + if (IS_BSMA_ON(pCfg->pSchemas + i)) { + ++nSma; + } + } + + if (nSma < pCfg->numOfColumns) { + bool smaOn = false; + *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " SMA("); + for (int32_t i = 0; i < pCfg->numOfColumns; ++i) { + if (IS_BSMA_ON(pCfg->pSchemas + i)) { + if (smaOn) { + *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, ",`%s`", (pCfg->pSchemas + i)->name); + } else { + smaOn = true; + *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "`%s`", (pCfg->pSchemas + i)->name); + } + } + } + *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, ")"); + } + } } static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* pDbCfg, char* tbName, STableCfg* pCfg) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c4111ded92..51da73d4d7 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4036,7 +4036,7 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) { bool compareStateKey(void* data, void* key) { if (!data || !key) { - return true; + return false; } SStateKeys* stateKey = (SStateKeys*)key; stateKey->pData = (char*)key + sizeof(SStateKeys); @@ -4062,7 +4062,13 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->winInfo.sessionWin.win)) { code = TSDB_CODE_FAILED; releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); - pCurWin->winInfo.pOutputBuf = taosMemoryMalloc(size); + pCurWin->winInfo.pOutputBuf = taosMemoryCalloc(1, size); + pCurWin->pStateKey = + (SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize)); + pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); + pCurWin->pStateKey->type = pAggSup->stateKeyType; + pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys); + pCurWin->pStateKey->isNull = false; } if (code == TSDB_CODE_SUCCESS) { @@ -4076,11 +4082,19 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, } pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin; - pNextWin->winInfo.pOutputBuf = NULL; - SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin); - code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, NULL, 0); + SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin); + int32_t nextSize = pAggSup->resultRowSize; + code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, &pNextWin->winInfo.pOutputBuf, &nextSize); if (code != TSDB_CODE_SUCCESS) { SET_SESSION_WIN_INVALID(pNextWin->winInfo); + } else { + pNextWin->pStateKey = + (SStateKeys*)((char*)pNextWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize)); + pNextWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); + pNextWin->pStateKey->type = pAggSup->stateKeyType; + pNextWin->pStateKey->pData = (char*)pNextWin->pStateKey + sizeof(SStateKeys); + pNextWin->pStateKey->isNull = false; + pNextWin->winInfo.isOutput = true; } pAggSup->stateStore.streamStateFreeCur(pCur); } @@ -4156,6 +4170,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl SStateWindowInfo curWin = {0}; SStateWindowInfo nextWin = {0}; setStateOutputBuf(pAggSup, tsCols[i], groupId, pKeyData, &curWin, &nextWin); + if (IS_VALID_SESSION_WIN(nextWin.winInfo)) { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextWin.winInfo.pOutputBuf, &pAPI->stateStore); + } setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo); winRows = updateStateWindowInfo(&curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual, pAggSup->pResultRows, pSeUpdated, pStDeleted); @@ -4346,9 +4363,19 @@ void streamStateReloadState(SOperatorInfo* pOperator) { for (int32_t i = 0; i < num; i++) { SStateWindowInfo curInfo = {0}; SStateWindowInfo nextInfo = {0}; + SStateWindowInfo dummy = {0}; setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo); if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) { compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pStUpdated, pInfo->pStDeleted); + saveResult(curInfo.winInfo, pInfo->pStUpdated); + } + + if (IS_VALID_SESSION_WIN(curInfo.winInfo)) { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)curInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); + } + + if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); } } taosMemoryFree(pBuf); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 0f2281ea73..f51efb23d1 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -201,6 +201,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs if (left == 0) { taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL; + pTask->status.downstreamReady = 1; if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id, diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index 4383c46c17..8adaab91a1 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -113,6 +113,15 @@ SDiskSize tfsGetSize(STfs *pTfs) { return size; } +int32_t tfsGetDisksAtLevel(STfs *pTfs, int32_t level) { + if (level < 0 || level >= pTfs->nlevel) { + return 0; + } + + STfsTier *pTier = TFS_TIER_AT(pTfs, level); + return pTier->ndisk; +} + int32_t tfsGetLevel(STfs *pTfs) { return pTfs->nlevel; } int32_t tfsAllocDisk(STfs *pTfs, int32_t expLevel, SDiskID *pDiskId) { @@ -272,6 +281,20 @@ int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId) { return 0; } +int32_t tfsMkdirRecur(STfs *pTfs, const char *rname) { + for (int32_t level = 0; level < pTfs->nlevel; level++) { + STfsTier *pTier = TFS_TIER_AT(pTfs, level); + for (int32_t id = 0; id < pTier->ndisk; id++) { + SDiskID did = {.id = id, .level = level}; + if (tfsMkdirRecurAt(pTfs, rname, did) < 0) { + return -1; + } + } + } + + return 0; +} + int32_t tfsMkdir(STfs *pTfs, const char *rname) { for (int32_t level = 0; level < pTfs->nlevel; level++) { STfsTier *pTier = TFS_TIER_AT(pTfs, level); @@ -314,25 +337,60 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) { return 0; } -int32_t tfsRename(STfs *pTfs, const char *orname, const char *nrname) { +static int32_t tfsRenameAt(STfs *pTfs, SDiskID diskId, const char *orname, const char *nrname) { char oaname[TMPNAME_LEN] = "\0"; char naname[TMPNAME_LEN] = "\0"; + int32_t level = diskId.level; + int32_t id = diskId.id; + STfsTier *pTier = TFS_TIER_AT(pTfs, level); + STfsDisk *pDisk = pTier->disks[id]; + snprintf(oaname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, orname); + snprintf(naname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, nrname); + + if (taosRenameFile(oaname, naname) != 0 && errno != ENOENT) { + terrno = TAOS_SYSTEM_ERROR(errno); + fError("failed to rename %s to %s since %s", oaname, naname, terrstr()); + return -1; + } + + return 0; +} + +int32_t tfsRename(STfs *pTfs, int32_t diskPrimary, const char *orname, const char *nrname) { for (int32_t level = pTfs->nlevel - 1; level >= 0; level--) { STfsTier *pTier = TFS_TIER_AT(pTfs, level); for (int32_t id = pTier->ndisk - 1; id >= 0; id--) { - STfsDisk *pDisk = pTier->disks[id]; - snprintf(oaname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, orname); - snprintf(naname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, nrname); - if (taosRenameFile(oaname, naname) != 0 && errno != ENOENT) { - terrno = TAOS_SYSTEM_ERROR(errno); - fError("failed to rename %s to %s since %s", oaname, naname, terrstr()); + if (level == 0 && id == diskPrimary) { + continue; + } + + SDiskID diskId = {.level = level, .id = id}; + if (tfsRenameAt(pTfs, diskId, orname, nrname)) { return -1; } } } - return 0; + SDiskID diskId = {.level = 0, .id = diskPrimary}; + return tfsRenameAt(pTfs, diskId, orname, nrname); +} + +int32_t tfsSearch(STfs *pTfs, int32_t level, const char *fname) { + if (level < 0 || level >= pTfs->nlevel) { + return -1; + } + char path[TMPNAME_LEN] = {0}; + STfsTier *pTier = TFS_TIER_AT(pTfs, level); + + for (int32_t id = 0; id < pTier->ndisk; id++) { + STfsDisk *pDisk = pTier->disks[id]; + snprintf(path, TMPNAME_LEN - 1, "%s%s%s", pDisk->path, TD_DIRSEP, fname); + if (taosCheckExistFile(path)) { + return id; + } + } + return -1; } STfsDir *tfsOpendir(STfs *pTfs, const char *rname) { diff --git a/source/libs/tfs/test/tfsTest.cpp b/source/libs/tfs/test/tfsTest.cpp index df37630fd7..9bbf6bc729 100644 --- a/source/libs/tfs/test/tfsTest.cpp +++ b/source/libs/tfs/test/tfsTest.cpp @@ -156,7 +156,7 @@ TEST_F(TfsTest, 03_Dir) { EXPECT_NE(taosDirExist(ap4), 1); EXPECT_EQ(tfsMkdirRecurAt(pTfs, p4, did), 0); EXPECT_EQ(taosDirExist(ap4), 1); - EXPECT_EQ(tfsRename(pTfs, p44, p45), 0); + EXPECT_EQ(tfsRename(pTfs, 0, p44, p45), 0); EXPECT_EQ(tfsRmdir(pTfs, p4), 0); EXPECT_NE(taosDirExist(ap4), 1); @@ -609,7 +609,7 @@ TEST_F(TfsTest, 05_MultiDisk) { EXPECT_NE(taosDirExist(_ap22), 1); EXPECT_EQ(tfsMkdirRecurAt(pTfs, p4, did), 0); EXPECT_EQ(taosDirExist(_ap22), 1); - EXPECT_EQ(tfsRename(pTfs, p44, p45), 0); + EXPECT_EQ(tfsRename(pTfs, 0, p44, p45), 0); EXPECT_EQ(tfsRmdir(pTfs, p4), 0); EXPECT_NE(taosDirExist(_ap22), 1); } @@ -721,4 +721,4 @@ TEST_F(TfsTest, 05_MultiDisk) { } tfsClose(pTfs); -} \ No newline at end of file +} diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 84004ed3c1..6f87f6b75b 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -327,17 +327,19 @@ bool getWinVersionReleaseName(char *releaseName, int32_t maxLen) { } #endif -int32_t taosGetOsReleaseName(char *releaseName, int32_t maxLen) { +int32_t taosGetOsReleaseName(char *releaseName, char* sName, char* ver, int32_t maxLen) { #ifdef WINDOWS if (!getWinVersionReleaseName(releaseName, maxLen)) { snprintf(releaseName, maxLen, "Windows"); } + if(sName) snprintf(sName, maxLen, "Windows"); return 0; #elif defined(_TD_DARWIN_64) char osversion[32]; size_t osversion_len = sizeof(osversion) - 1; int osversion_name[] = { CTL_KERN, KERN_OSRELEASE }; + if(sName) snprintf(sName, maxLen, "macOS"); if (sysctl(osversion_name, 2, osversion, &osversion_len, NULL, 0) == -1) { return -1; } @@ -357,24 +359,35 @@ int32_t taosGetOsReleaseName(char *releaseName, int32_t maxLen) { return 0; #else char line[1024]; + char *dest = NULL; size_t size = 0; int32_t code = -1; + int32_t cnt = 0; TdFilePtr pFile = taosOpenFile("/etc/os-release", TD_FILE_READ | TD_FILE_STREAM); - if (pFile == NULL) return false; + if (pFile == NULL) return code; while ((size = taosGetsFile(pFile, sizeof(line), line)) != -1) { line[size - 1] = '\0'; - if (strncmp(line, "PRETTY_NAME", 11) == 0) { - const char *p = strchr(line, '=') + 1; - if (*p == '"') { - p++; - line[size - 2] = 0; - } - tstrncpy(releaseName, p, maxLen); + if (strncmp(line, "NAME", 4) == 0) { + dest = sName; + } else if (strncmp(line, "PRETTY_NAME", 11) == 0) { + dest = releaseName; code = 0; - break; + } else if (strncmp(line, "VERSION_ID", 10) == 0) { + dest = ver; + } else { + continue; } + if (!dest) continue; + const char *p = strchr(line, '=') + 1; + if (*p == '"') { + p++; + line[size - 2] = 0; + } + tstrncpy(dest, p, maxLen); + + if (++cnt >= 3) break; } taosCloseFile(&pFile); diff --git a/source/os/test/osTests.cpp b/source/os/test/osTests.cpp index 1d6542e78c..a2ccc4de02 100644 --- a/source/os/test/osTests.cpp +++ b/source/os/test/osTests.cpp @@ -37,7 +37,7 @@ TEST(osTest, osSystem) { const int sysLen = 64; char osSysName[sysLen]; - int ret = taosGetOsReleaseName(osSysName, sysLen); + int ret = taosGetOsReleaseName(osSysName, NULL, NULL, sysLen); printf("os systeme name:%s\n", osSysName); ASSERT_EQ(ret, 0); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index d043d22445..8f5fbc0844 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -329,6 +329,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE, "Please use this comma // vnode TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode is closed or removed") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_INIT_FAILED, "Vnode init failure") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_EXIST, "Vnode not exist") TAOS_DEFINE_ERROR(TSDB_CODE_VND_ALREADY_EXIST, "Vnode already exist")