Merge branch 'main' of https://github.com/taosdata/TDengine into 3.0.3.00302

This commit is contained in:
Hongze Cheng 2023-03-02 19:47:22 +08:00
commit ed48296acd
54 changed files with 531 additions and 1088 deletions

View File

@ -929,12 +929,19 @@ typedef struct {
int32_t minRows; int32_t minRows;
int32_t maxRows; int32_t maxRows;
int32_t walFsyncPeriod; int32_t walFsyncPeriod;
int16_t hashPrefix;
int16_t hashSuffix;
int8_t walLevel; int8_t walLevel;
int8_t precision; int8_t precision;
int8_t compression; int8_t compression;
int8_t replications; int8_t replications;
int8_t strict; int8_t strict;
int8_t cacheLast; int8_t cacheLast;
int32_t tsdbPageSize;
int32_t walRetentionPeriod;
int32_t walRollPeriod;
int64_t walRetentionSize;
int64_t walSegmentSize;
int32_t numOfRetensions; int32_t numOfRetensions;
SArray* pRetensions; SArray* pRetensions;
int8_t schemaless; int8_t schemaless;

View File

@ -265,6 +265,7 @@ typedef struct SShowStmt {
typedef struct SShowCreateDatabaseStmt { typedef struct SShowCreateDatabaseStmt {
ENodeType type; ENodeType type;
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
void* pCfg; // SDbCfgInfo void* pCfg; // SDbCfgInfo
} SShowCreateDatabaseStmt; } SShowCreateDatabaseStmt;

View File

@ -116,6 +116,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B) #define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B)
#define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C) #define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C)
#define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D) #define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D)
#define TSDB_CODE_NO_ENOUGH_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012E)
#define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) // #define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) //
#define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) // #define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) //

View File

@ -46,9 +46,9 @@ rd /s /Q C:\TDengine
cmake --install . cmake --install .
if not %errorlevel% == 0 ( call :RUNFAILED build x64 failed & exit /b 1) if not %errorlevel% == 0 ( call :RUNFAILED build x64 failed & exit /b 1)
cd %package_dir% cd %package_dir%
iscc /DMyAppInstallName="%packagServerName_x64%" /DMyAppVersion="%2" /DMyAppExcludeSource="" tools\tdengine.iss /O..\release iscc /DMyAppInstallName="%packagServerName_x64%" /DMyAppVersion="%2" /DCusName="TDengine" /DCusPrompt="taos" /DMyAppExcludeSource="" tools\tdengine.iss /O..\release
if not %errorlevel% == 0 ( call :RUNFAILED package %packagServerName_x64% failed & exit /b 1) if not %errorlevel% == 0 ( call :RUNFAILED package %packagServerName_x64% failed & exit /b 1)
iscc /DMyAppInstallName="%packagClientName_x64%" /DMyAppVersion="%2" /DMyAppExcludeSource="taosd.exe" tools\tdengine.iss /O..\release iscc /DMyAppInstallName="%packagClientName_x64%" /DMyAppVersion="%2" /DCusName="TDengine" /DCusPrompt="taos" /DMyAppExcludeSource="taosd.exe" tools\tdengine.iss /O..\release
if not %errorlevel% == 0 ( call :RUNFAILED package %packagClientName_x64% failed & exit /b 1) if not %errorlevel% == 0 ( call :RUNFAILED package %packagClientName_x64% failed & exit /b 1)
goto EXIT0 goto EXIT0

View File

@ -89,7 +89,7 @@ else
${build_dir}/bin/tdengine-datasource.zip \ ${build_dir}/bin/tdengine-datasource.zip \
${build_dir}/bin/tdengine-datasource.zip.md5sum" ${build_dir}/bin/tdengine-datasource.zip.md5sum"
[ -f ${build_dir}/bin/taosx ] && taosx_bin="${build_dir}/bin/taosx" [ -f ${build_dir}/bin/taosx ] && taosx_bin="${build_dir}/bin/taosx"
explorer_bin_files=$(sh -c "ls ${build_dir}/bin/*-explorer") explorer_bin_files=$(find ${build_dir}/bin/ -name '*-explorer')
bin_files="${build_dir}/bin/${serverName} \ bin_files="${build_dir}/bin/${serverName} \
${build_dir}/bin/${clientName} \ ${build_dir}/bin/${clientName} \

View File

@ -66,7 +66,8 @@ enum {
typedef struct SAppInstInfo SAppInstInfo; typedef struct SAppInstInfo SAppInstInfo;
typedef struct { typedef struct {
char* key; char* key;
int32_t idx;
// statistics // statistics
int32_t reportCnt; int32_t reportCnt;
int32_t connKeyCnt; int32_t connKeyCnt;

View File

@ -303,8 +303,12 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
} }
static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
if (0 == atomic_load_8(&clientHbMgr.inited)) {
goto _return;
}
static int32_t emptyRspNum = 0; static int32_t emptyRspNum = 0;
char *key = (char *)param; int32_t idx = *(int32_t *)param;
SClientHbBatchRsp pRsp = {0}; SClientHbBatchRsp pRsp = {0};
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp); tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
@ -319,22 +323,24 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
int32_t rspNum = taosArrayGetSize(pRsp.rsps); int32_t rspNum = taosArrayGetSize(pRsp.rsps);
taosThreadMutexLock(&appInfo.mutex); taosThreadMutexLock(&clientHbMgr.lock);
SAppInstInfo **pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, idx);
if (pInst == NULL || NULL == *pInst) { if (pAppHbMgr == NULL) {
taosThreadMutexUnlock(&appInfo.mutex); taosThreadMutexUnlock(&clientHbMgr.lock);
tscError("cluster not exist, key:%s", key); tscError("appHbMgr not exist, idx:%d", idx);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
tFreeClientHbBatchRsp(&pRsp); tFreeClientHbBatchRsp(&pRsp);
return -1; return -1;
} }
SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo;
if (code != 0) { if (code != 0) {
(*pInst)->onlineDnodes = ((*pInst)->totalDnodes ? 0 : -1); pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1;
tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), (*pInst)->onlineDnodes, tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), pInst->onlineDnodes,
(*pInst)->totalDnodes); pInst->totalDnodes);
} }
if (rspNum) { if (rspNum) {
@ -346,15 +352,17 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
for (int32_t i = 0; i < rspNum; ++i) { for (int32_t i = 0; i < rspNum; ++i) {
SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i); SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i);
code = (*clientHbMgr.rspHandle[rsp->connKey.connType])((*pInst)->pAppHbMgr, rsp); code = (*clientHbMgr.rspHandle[rsp->connKey.connType])(pAppHbMgr, rsp);
if (code) { if (code) {
break; break;
} }
} }
taosThreadMutexUnlock(&appInfo.mutex); taosThreadMutexUnlock(&clientHbMgr.lock);
tFreeClientHbBatchRsp(&pRsp); tFreeClientHbBatchRsp(&pRsp);
_return:
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
return code; return code;
@ -788,7 +796,8 @@ static void *hbThreadFunc(void *param) {
pInfo->msgInfo.pData = buf; pInfo->msgInfo.pData = buf;
pInfo->msgInfo.len = tlen; pInfo->msgInfo.len = tlen;
pInfo->msgType = TDMT_MND_HEARTBEAT; pInfo->msgType = TDMT_MND_HEARTBEAT;
pInfo->param = taosStrdup(pAppHbMgr->key); pInfo->param = taosMemoryMalloc(sizeof(int32_t));
*(int32_t *)pInfo->param = i;
pInfo->paramFreeFp = taosMemoryFree; pInfo->paramFreeFp = taosMemoryFree;
pInfo->requestId = generateRequestId(); pInfo->requestId = generateRequestId();
pInfo->requestObjRefId = 0; pInfo->requestObjRefId = 0;
@ -874,6 +883,7 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
taosThreadMutexLock(&clientHbMgr.lock); taosThreadMutexLock(&clientHbMgr.lock);
taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr); taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
pAppHbMgr->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1;
taosThreadMutexUnlock(&clientHbMgr.lock); taosThreadMutexUnlock(&clientHbMgr.lock);
return pAppHbMgr; return pAppHbMgr;

View File

@ -2003,7 +2003,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
switch (pColInfoData->info.type) { switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
memset(pBuf, 0, sizeof(pBuf)); memset(pBuf, 0, sizeof(pBuf));
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI); formatTimestamp(pBuf, *(uint64_t*)var, pColInfoData->info.precision);
len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf); len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf);
if (len >= size - 1) return dumpBuf; if (len >= size - 1) return dumpBuf;
break; break;

View File

@ -41,8 +41,8 @@ bool tsPrintAuth = false;
// queue & threads // queue & threads
int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 5000; int32_t tsNumOfRpcSessions = 6000;
int32_t tsTimeToGetAvailableConn = 100000; int32_t tsTimeToGetAvailableConn = 500000;
int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfCommitThreads = 2;
int32_t tsNumOfTaskQueueThreads = 4; int32_t tsNumOfTaskQueueThreads = 4;
int32_t tsNumOfMnodeQueryThreads = 4; int32_t tsNumOfMnodeQueryThreads = 4;
@ -521,7 +521,6 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem = cfgGetItem(tsCfg, "numOfRpcSessions"); pItem = cfgGetItem(tsCfg, "numOfRpcSessions");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfRpcSessions = 2000;
tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000); tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000);
pItem->i32 = tsNumOfRpcSessions; pItem->i32 = tsNumOfRpcSessions;
pItem->stype = stype; pItem->stype = stype;
@ -529,7 +528,6 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem = cfgGetItem(tsCfg, "timeToGetAvailableConn"); pItem = cfgGetItem(tsCfg, "timeToGetAvailableConn");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsTimeToGetAvailableConn = 1000;
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000); tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000);
pItem->i32 = tsTimeToGetAvailableConn; pItem->i32 = tsTimeToGetAvailableConn;
pItem->stype = stype; pItem->stype = stype;

View File

@ -2865,12 +2865,19 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
if (tEncodeI32(&encoder, pRsp->minRows) < 0) return -1; if (tEncodeI32(&encoder, pRsp->minRows) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->maxRows) < 0) return -1; if (tEncodeI32(&encoder, pRsp->maxRows) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->walFsyncPeriod) < 0) return -1; if (tEncodeI32(&encoder, pRsp->walFsyncPeriod) < 0) return -1;
if (tEncodeI16(&encoder, pRsp->hashPrefix) < 0) return -1;
if (tEncodeI16(&encoder, pRsp->hashSuffix) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->walLevel) < 0) return -1; if (tEncodeI8(&encoder, pRsp->walLevel) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->precision) < 0) return -1; if (tEncodeI8(&encoder, pRsp->precision) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->compression) < 0) return -1; if (tEncodeI8(&encoder, pRsp->compression) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->replications) < 0) return -1; if (tEncodeI8(&encoder, pRsp->replications) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->strict) < 0) return -1; if (tEncodeI8(&encoder, pRsp->strict) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->cacheLast) < 0) return -1; if (tEncodeI8(&encoder, pRsp->cacheLast) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->tsdbPageSize) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->walRetentionPeriod) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->walRollPeriod) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->walRetentionSize) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->walSegmentSize) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->numOfRetensions) < 0) return -1; if (tEncodeI32(&encoder, pRsp->numOfRetensions) < 0) return -1;
for (int32_t i = 0; i < pRsp->numOfRetensions; ++i) { for (int32_t i = 0; i < pRsp->numOfRetensions; ++i) {
SRetention *pRetension = taosArrayGet(pRsp->pRetensions, i); SRetention *pRetension = taosArrayGet(pRsp->pRetensions, i);
@ -2905,12 +2912,19 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
if (tDecodeI32(&decoder, &pRsp->minRows) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->minRows) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->maxRows) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->maxRows) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->walFsyncPeriod) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->walFsyncPeriod) < 0) return -1;
if (tDecodeI16(&decoder, &pRsp->hashPrefix) < 0) return -1;
if (tDecodeI16(&decoder, &pRsp->hashSuffix) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->walLevel) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->walLevel) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->precision) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->precision) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->compression) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->compression) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->replications) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->replications) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->strict) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->cacheLast) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->cacheLast) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->tsdbPageSize) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->walRetentionPeriod) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->walRollPeriod) < 0) return -1;
if (tDecodeI64(&decoder, &pRsp->walRetentionSize) < 0) return -1;
if (tDecodeI64(&decoder, &pRsp->walSegmentSize) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numOfRetensions) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->numOfRetensions) < 0) return -1;
if (pRsp->numOfRetensions > 0) { if (pRsp->numOfRetensions > 0) {
pRsp->pRetensions = taosArrayInit(pRsp->numOfRetensions, sizeof(SRetention)); pRsp->pRetensions = taosArrayInit(pRsp->numOfRetensions, sizeof(SRetention));

View File

@ -192,8 +192,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
taosWriteQitem(pVnode->pFetchQ, pMsg); taosWriteQitem(pVnode->pFetchQ, pMsg);
break; break;
case WRITE_QUEUE: case WRITE_QUEUE:
if (!osDataSpaceAvailable()) { if (!osDataSpaceSufficient()) {
terrno = TSDB_CODE_NO_DISKSPACE; terrno = TSDB_CODE_NO_ENOUGH_DISKSPACE;
code = terrno; code = terrno;
dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code)); dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
break; break;

View File

@ -2,8 +2,9 @@ aux_source_directory(src MNODE_SRC)
IF (TD_PRIVILEGE) IF (TD_PRIVILEGE)
ADD_DEFINITIONS(-D_PRIVILEGE) ADD_DEFINITIONS(-D_PRIVILEGE)
ENDIF () ENDIF ()
IF (TD_PRIVILEGE) IF (TD_ENTERPRISE)
LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/privilege/src/privilege.c) LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/privilege/src/privilege.c)
LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/mnode/src/mndDb.c)
ENDIF () ENDIF ()
add_library(mnode STATIC ${MNODE_SRC}) add_library(mnode STATIC ${MNODE_SRC})

View File

@ -33,6 +33,8 @@ bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb);
SSdbRaw *mndDbActionEncode(SDbObj *pDb); SSdbRaw *mndDbActionEncode(SDbObj *pDb);
const char *mndGetDbStr(const char *src); const char *mndGetDbStr(const char *src);
int32_t mndProcessCompactDbReq(SRpcMsg *pReq);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -41,12 +41,15 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq);
static int32_t mndProcessAlterDbReq(SRpcMsg *pReq); static int32_t mndProcessAlterDbReq(SRpcMsg *pReq);
static int32_t mndProcessDropDbReq(SRpcMsg *pReq); static int32_t mndProcessDropDbReq(SRpcMsg *pReq);
static int32_t mndProcessUseDbReq(SRpcMsg *pReq); static int32_t mndProcessUseDbReq(SRpcMsg *pReq);
static int32_t mndProcessCompactDbReq(SRpcMsg *pReq);
static int32_t mndProcessTrimDbReq(SRpcMsg *pReq); static int32_t mndProcessTrimDbReq(SRpcMsg *pReq);
static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity); static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity);
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter); static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq); static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq);
#ifndef TD_ENTERPRISE
int32_t mndProcessCompactDbReq(SRpcMsg *pReq) { return TSDB_CODE_OPS_NOT_SUPPORT; }
#endif
int32_t mndInitDb(SMnode *pMnode) { int32_t mndInitDb(SMnode *pMnode) {
SSdbTable table = { SSdbTable table = {
.sdbType = SDB_DB, .sdbType = SDB_DB,
@ -884,12 +887,19 @@ static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq) {
cfgRsp.minRows = pDb->cfg.minRows; cfgRsp.minRows = pDb->cfg.minRows;
cfgRsp.maxRows = pDb->cfg.maxRows; cfgRsp.maxRows = pDb->cfg.maxRows;
cfgRsp.walFsyncPeriod = pDb->cfg.walFsyncPeriod; cfgRsp.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
cfgRsp.hashPrefix = pDb->cfg.hashPrefix;
cfgRsp.hashSuffix = pDb->cfg.hashSuffix;
cfgRsp.walLevel = pDb->cfg.walLevel; cfgRsp.walLevel = pDb->cfg.walLevel;
cfgRsp.precision = pDb->cfg.precision; cfgRsp.precision = pDb->cfg.precision;
cfgRsp.compression = pDb->cfg.compression; cfgRsp.compression = pDb->cfg.compression;
cfgRsp.replications = pDb->cfg.replications; cfgRsp.replications = pDb->cfg.replications;
cfgRsp.strict = pDb->cfg.strict; cfgRsp.strict = pDb->cfg.strict;
cfgRsp.cacheLast = pDb->cfg.cacheLast; cfgRsp.cacheLast = pDb->cfg.cacheLast;
cfgRsp.tsdbPageSize = pDb->cfg.tsdbPageSize;
cfgRsp.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
cfgRsp.walRollPeriod = pDb->cfg.walRollPeriod;
cfgRsp.walRetentionSize = pDb->cfg.walRetentionSize;
cfgRsp.walSegmentSize = pDb->cfg.walSegmentSize;
cfgRsp.numOfRetensions = pDb->cfg.numOfRetensions; cfgRsp.numOfRetensions = pDb->cfg.numOfRetensions;
cfgRsp.pRetensions = pDb->cfg.pRetensions; cfgRsp.pRetensions = pDb->cfg.pRetensions;
cfgRsp.schemaless = pDb->cfg.schemaless; cfgRsp.schemaless = pDb->cfg.schemaless;
@ -1395,98 +1405,6 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
return 0; return 0;
} }
static int32_t mndSetCompactDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t compactTs) {
SDbObj dbObj = {0};
memcpy(&dbObj, pDb, sizeof(SDbObj));
dbObj.compactStartTime = compactTs;
SSdbRaw *pCommitRaw = mndDbActionEncode(&dbObj);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
sdbFreeRaw(pCommitRaw);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
return 0;
}
static int32_t mndSetCompactDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t compactTs) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (mndVgroupInDb(pVgroup, pDb->uid)) {
if (mndBuildCompactVgroupAction(pMnode, pTrans, pDb, pVgroup, compactTs) != 0) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
return -1;
}
}
sdbRelease(pSdb, pVgroup);
}
return 0;
}
static int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
int64_t compactTs = taosGetTimestampMs();
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "compact-db");
if (pTrans == NULL) goto _OVER;
mInfo("trans:%d, used to compact db:%s", pTrans->id, pDb->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndSetCompactDbCommitLogs(pMnode, pTrans, pDb, compactTs) != 0) goto _OVER;
if (mndSetCompactDbRedoActions(pMnode, pTrans, pDb, compactTs) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
_OVER:
mndTransDrop(pTrans);
return code;
}
static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SDbObj *pDb = NULL;
SCompactDbReq compactReq = {0};
if (tDeserializeSCompactDbReq(pReq->pCont, pReq->contLen, &compactReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
mInfo("db:%s, start to compact", compactReq.db);
pDb = mndAcquireDb(pMnode, compactReq.db);
if (pDb == NULL) {
goto _OVER;
}
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB, pDb) != 0) {
goto _OVER;
}
code = mndCompactDb(pMnode, pReq, pDb);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("db:%s, failed to process compact db req since %s", compactReq.db, terrstr());
}
mndReleaseDb(pMnode, pDb);
return code;
}
static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) { static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;

View File

@ -344,8 +344,8 @@ static int32_t mndInitWal(SMnode *pMnode) {
.fsyncPeriod = 0, .fsyncPeriod = 0,
.rollPeriod = -1, .rollPeriod = -1,
.segSize = -1, .segSize = -1,
.retentionPeriod = -1, .retentionPeriod = 0,
.retentionSize = -1, .retentionSize = 0,
.level = TAOS_WAL_FSYNC, .level = TAOS_WAL_FSYNC,
}; };
@ -370,7 +370,6 @@ static int32_t mndInitSdb(SMnode *pMnode) {
opt.path = pMnode->path; opt.path = pMnode->path;
opt.pMnode = pMnode; opt.pMnode = pMnode;
opt.pWal = pMnode->pWal; opt.pWal = pMnode->pWal;
opt.sync = pMnode->syncMgmt.sync;
pMnode->pSdb = sdbInit(&opt); pMnode->pSdb = sdbInit(&opt);
if (pMnode->pSdb == NULL) { if (pMnode->pSdb == NULL) {
@ -552,16 +551,7 @@ void mndPreClose(SMnode *pMnode) {
if (pMnode != NULL) { if (pMnode != NULL) {
syncLeaderTransfer(pMnode->syncMgmt.sync); syncLeaderTransfer(pMnode->syncMgmt.sync);
syncPreStop(pMnode->syncMgmt.sync); syncPreStop(pMnode->syncMgmt.sync);
#if 0 sdbWriteFile(pMnode->pSdb, 0);
while (syncSnapshotRecving(pMnode->syncMgmt.sync)) {
mInfo("vgId:1, snapshot is recving");
taosMsleep(300);
}
while (syncSnapshotSending(pMnode->syncMgmt.sync)) {
mInfo("vgId:1, snapshot is sending");
taosMsleep(300);
}
#endif
} }
} }

View File

@ -118,12 +118,12 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
transId, pTrans->createdTime, pMgmt->transId); transId, pTrans->createdTime, pMgmt->transId);
mndTransExecute(pMnode, pTrans, false); mndTransExecute(pMnode, pTrans, false);
mndReleaseTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans);
// sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
} else { } else {
mError("trans:%d, not found while execute in mnode since %s", transId, terrstr()); mError("trans:%d, not found while execute in mnode since %s", transId, terrstr());
} }
} }
sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
return 0; return 0;
} }
@ -319,6 +319,7 @@ int32_t mndInitSync(SMnode *pMnode) {
mError("failed to open sync since %s", terrstr()); mError("failed to open sync since %s", terrstr());
return -1; return -1;
} }
pMnode->pSdb->sync = pMgmt->sync;
mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync); mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
return 0; return 0;

View File

@ -1645,8 +1645,6 @@ void mndTransPullup(SMnode *pMnode) {
} }
mndReleaseTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans);
} }
sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
} }

View File

@ -37,7 +37,7 @@ extern "C" {
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
// clang-format on // clang-format on
#define SDB_WRITE_DELTA 20 #define SDB_WRITE_DELTA 2000
#define SDB_GET_VAL(pData, dataPos, val, pos, func, type) \ #define SDB_GET_VAL(pData, dataPos, val, pos, func, type) \
{ \ { \

View File

@ -53,7 +53,6 @@ SSdb *sdbInit(SSdbOpt *pOption) {
} }
pSdb->pWal = pOption->pWal; pSdb->pWal = pOption->pWal;
pSdb->sync = pOption->sync;
pSdb->applyIndex = -1; pSdb->applyIndex = -1;
pSdb->applyTerm = -1; pSdb->applyTerm = -1;
pSdb->applyConfig = -1; pSdb->applyConfig = -1;

View File

@ -472,10 +472,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
taosThreadMutexLock(&pSdb->filelock); taosThreadMutexLock(&pSdb->filelock);
if (pSdb->pWal != NULL) { if (pSdb->pWal != NULL) {
// code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex, 0); if (pSdb->sync > 0) {
if (pSdb->sync == 0) {
code = 0;
} else {
code = syncBeginSnapshot(pSdb->sync, pSdb->applyIndex); code = syncBeginSnapshot(pSdb->sync, pSdb->applyIndex);
} }
} }
@ -484,11 +481,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
} }
if (code == 0) { if (code == 0) {
if (pSdb->pWal != NULL) { if (pSdb->pWal != NULL) {
// code = walEndSnapshot(pSdb->pWal); if (pSdb->sync > 0) {
if (pSdb->sync == 0) {
code = 0;
} else {
code = syncEndSnapshot(pSdb->sync); code = syncEndSnapshot(pSdb->sync);
} }
} }

View File

@ -14,7 +14,6 @@ target_sources(
"src/vnd/vnodeSvr.c" "src/vnd/vnodeSvr.c"
"src/vnd/vnodeSync.c" "src/vnd/vnodeSync.c"
"src/vnd/vnodeSnapshot.c" "src/vnd/vnodeSnapshot.c"
"src/vnd/vnodeCompact.c"
"src/vnd/vnodeRetention.c" "src/vnd/vnodeRetention.c"
# meta # meta
@ -53,7 +52,6 @@ target_sources(
"src/tsdb/tsdbCacheRead.c" "src/tsdb/tsdbCacheRead.c"
"src/tsdb/tsdbRetention.c" "src/tsdb/tsdbRetention.c"
"src/tsdb/tsdbDiskData.c" "src/tsdb/tsdbDiskData.c"
"src/tsdb/tsdbCompact.c"
"src/tsdb/tsdbMergeTree.c" "src/tsdb/tsdbMergeTree.c"
"src/tsdb/tsdbDataIter.c" "src/tsdb/tsdbDataIter.c"
@ -69,10 +67,20 @@ target_sources(
"src/tq/tqSnapshot.c" "src/tq/tqSnapshot.c"
"src/tq/tqOffsetSnapshot.c" "src/tq/tqOffsetSnapshot.c"
) )
IF (TD_VNODE_PLUGINS)
target_sources(
vnode
PRIVATE
${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/tsdbCompact.c
${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/vnodeCompact.c
)
ENDIF ()
target_include_directories( target_include_directories(
vnode vnode
PUBLIC "inc" PUBLIC "inc"
PRIVATE "src/inc" PUBLIC "src/inc"
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar" PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
) )
target_link_libraries( target_link_libraries(

View File

@ -106,10 +106,6 @@ int32_t vnodeSyncCommit(SVnode* pVnode);
int32_t vnodeAsyncCommit(SVnode* pVnode); int32_t vnodeAsyncCommit(SVnode* pVnode);
bool vnodeShouldRollback(SVnode* pVnode); bool vnodeShouldRollback(SVnode* pVnode);
// vnodeCompact.c
int32_t vnodeAsyncCompact(SVnode* pVnode);
int32_t vnodeSyncCompact(SVnode* pVnode);
// vnodeSync.c // vnodeSync.c
int32_t vnodeSyncOpen(SVnode* pVnode, char* path); int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
int32_t vnodeSyncStart(SVnode* pVnode); int32_t vnodeSyncStart(SVnode* pVnode);

View File

@ -147,7 +147,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
} }
tqDebug("tmqsnap task execute end, get %p", pDataBlock); tqDebug("tmqsnap task execute end, get %p", pDataBlock);
if (pDataBlock != NULL) { if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
if (pRsp->withTbName) { if (pRsp->withTbName) {
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pExecReader->lastBlkUid; int64_t uid = pExec->pExecReader->lastBlkUid;

View File

@ -1,664 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
extern int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo);
extern int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg);
extern int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg);
typedef struct {
STsdb *pTsdb;
int64_t commitID;
int8_t cmprAlg;
int32_t maxRows;
int32_t minRows;
STsdbFS fs;
int32_t fid;
TABLEID tbid;
SSkmInfo tbSkm;
// Tombstone
SDelFReader *pDelFReader;
SArray *aDelIdx; // SArray<SDelIdx>
SArray *aDelData; // SArray<SDelData>
SArray *aSkyLine; // SArray<TSDBKEY>
int32_t iDelIdx;
int32_t iSkyLine;
TSDBKEY *pDKey;
TSDBKEY dKey;
// Reader
SDataFReader *pReader;
STsdbDataIter2 *iterList; // list of iterators
STsdbDataIter2 *pIter;
SRBTree rbt;
// Writer
SDataFWriter *pWriter;
SArray *aBlockIdx; // SArray<SBlockIdx>
SMapData mDataBlk; // SMapData<SDataBlk>
SArray *aSttBlk; // SArray<SSttBlk>
SBlockData bData;
SBlockData sData;
} STsdbCompactor;
static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = pCompactor->pTsdb;
code = tsdbFSRollback(pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
return code;
}
static int32_t tsdbCompactWriteTableDataStart(STsdbCompactor *pCompactor, TABLEID *pId) {
int32_t code = 0;
int32_t lino = 0;
pCompactor->tbid = *pId;
// tombstone
for (;;) {
if (pCompactor->iDelIdx >= taosArrayGetSize(pCompactor->aDelIdx)) {
pCompactor->pDKey = NULL;
break;
}
SDelIdx *pDelIdx = (SDelIdx *)taosArrayGet(pCompactor->aDelIdx, pCompactor->iDelIdx);
int32_t c = tTABLEIDCmprFn(pDelIdx, &pCompactor->tbid);
if (c < 0) {
pCompactor->iDelIdx++;
} else if (c == 0) {
pCompactor->iDelIdx++;
code = tsdbReadDelData(pCompactor->pDelFReader, pDelIdx, pCompactor->aDelData);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbBuildDeleteSkyline(pCompactor->aDelData, 0, taosArrayGetSize(pCompactor->aDelData) - 1,
pCompactor->aSkyLine);
TSDB_CHECK_CODE(code, lino, _exit);
pCompactor->iSkyLine = 0;
if (pCompactor->iSkyLine < taosArrayGetSize(pCompactor->aSkyLine)) {
TSDBKEY *pKey = (TSDBKEY *)taosArrayGet(pCompactor->aSkyLine, pCompactor->iSkyLine);
pCompactor->dKey.version = 0;
pCompactor->dKey.ts = pKey->ts;
pCompactor->pDKey = &pCompactor->dKey;
} else {
pCompactor->pDKey = NULL;
}
break;
} else {
pCompactor->pDKey = NULL;
break;
}
}
// writer
code = tsdbUpdateTableSchema(pCompactor->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pCompactor->tbSkm);
TSDB_CHECK_CODE(code, lino, _exit);
tMapDataReset(&pCompactor->mDataBlk);
code = tBlockDataInit(&pCompactor->bData, pId, pCompactor->tbSkm.pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
if (!TABLE_SAME_SCHEMA(pCompactor->sData.suid, pCompactor->sData.uid, pId->suid, pId->uid)) {
if (pCompactor->sData.nRow > 0) {
code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
}
TABLEID tbid = {.suid = pId->suid, .uid = pId->suid ? 0 : pId->uid};
code = tBlockDataInit(&pCompactor->sData, &tbid, pCompactor->tbSkm.pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
tstrerror(code));
} else {
tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__, pId->suid,
pId->uid);
}
return code;
}
static int32_t tsdbCompactWriteTableDataEnd(STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
if (pCompactor->bData.nRow > 0) {
if (pCompactor->bData.nRow < pCompactor->minRows) {
for (int32_t iRow = 0; iRow < pCompactor->bData.nRow; iRow++) {
code = tBlockDataAppendRow(&pCompactor->sData, &tsdbRowFromBlockData(&pCompactor->bData, iRow), NULL,
pCompactor->tbid.uid);
TSDB_CHECK_CODE(code, lino, _exit);
if (pCompactor->sData.nRow >= pCompactor->maxRows) {
code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
tBlockDataClear(&pCompactor->bData);
} else {
code = tsdbWriteDataBlock(pCompactor->pWriter, &pCompactor->bData, &pCompactor->mDataBlk, pCompactor->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
if (pCompactor->mDataBlk.nItem > 0) {
SBlockIdx *pBlockIdx = (SBlockIdx *)taosArrayReserve(pCompactor->aBlockIdx, 1);
if (pBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pBlockIdx->suid = pCompactor->tbid.suid;
pBlockIdx->uid = pCompactor->tbid.uid;
code = tsdbWriteDataBlk(pCompactor->pWriter, &pCompactor->mDataBlk, pBlockIdx);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
tstrerror(code));
} else {
tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__,
pCompactor->tbid.suid, pCompactor->tbid.uid);
}
return code;
}
static bool tsdbCompactRowIsDeleted(STsdbCompactor *pCompactor, TSDBROW *pRow) {
TSDBKEY tKey = TSDBROW_KEY(pRow);
TSDBKEY *aKey = (TSDBKEY *)TARRAY_DATA(pCompactor->aSkyLine);
int32_t nKey = TARRAY_SIZE(pCompactor->aSkyLine);
if (tKey.ts > pCompactor->pDKey->ts) {
do {
pCompactor->pDKey->version = aKey[pCompactor->iSkyLine].version;
pCompactor->iSkyLine++;
if (pCompactor->iSkyLine < nKey) {
pCompactor->dKey.ts = aKey[pCompactor->iSkyLine].ts;
} else {
if (pCompactor->pDKey->version == 0) {
pCompactor->pDKey = NULL;
return false;
} else {
pCompactor->pDKey->ts = INT64_MAX;
}
}
} while (tKey.ts > pCompactor->pDKey->ts);
}
if (tKey.ts < pCompactor->pDKey->ts) {
if (tKey.version > pCompactor->pDKey->version) {
return false;
} else {
return true;
}
} else if (tKey.ts == pCompactor->pDKey->ts) {
ASSERT(pCompactor->iSkyLine < nKey);
if (tKey.version > TMAX(pCompactor->pDKey->version, aKey[pCompactor->iSkyLine].version)) {
return false;
} else {
return true;
}
}
return false;
}
static int32_t tsdbCompactWriteTableData(STsdbCompactor *pCompactor, SRowInfo *pRowInfo) {
int32_t code = 0;
int32_t lino = 0;
// start a new table data write if need
if (pRowInfo == NULL || pRowInfo->uid != pCompactor->tbid.uid) {
if (pCompactor->tbid.uid) {
code = tsdbCompactWriteTableDataEnd(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (pRowInfo == NULL) {
if (pCompactor->sData.nRow > 0) {
code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
}
return code;
}
code = tsdbCompactWriteTableDataStart(pCompactor, (TABLEID *)pRowInfo);
TSDB_CHECK_CODE(code, lino, _exit);
}
// check if row is deleted
if (pCompactor->pDKey && tsdbCompactRowIsDeleted(pCompactor, &pRowInfo->row)) goto _exit;
if (tBlockDataTryUpsertRow(&pCompactor->bData, &pRowInfo->row, pRowInfo->uid) > pCompactor->maxRows) {
code = tsdbWriteDataBlock(pCompactor->pWriter, &pCompactor->bData, &pCompactor->mDataBlk, pCompactor->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataUpsertRow(&pCompactor->bData, &pRowInfo->row, NULL, pRowInfo->uid);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
tstrerror(code));
} else if (pRowInfo) {
tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64,
TD_VID(pCompactor->pTsdb->pVnode), __func__, pRowInfo->suid, pRowInfo->uid, TSDBROW_TS(&pRowInfo->row),
TSDBROW_VERSION(&pRowInfo->row));
}
return code;
}
static bool tsdbCompactTableIsDropped(STsdbCompactor *pCompactor) {
SMetaInfo info;
if (pCompactor->pIter->rowInfo.uid == pCompactor->tbid.uid) return false;
if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pCompactor->pIter->rowInfo.uid, &info, NULL)) {
return true;
}
return false;
}
static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor, SRowInfo **ppRowInfo) {
int32_t code = 0;
int32_t lino = 0;
for (;;) {
if (pCompactor->pIter) {
code = tsdbDataIterNext2(pCompactor->pIter, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) {
pCompactor->pIter = NULL;
} else {
SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rbt);
if (pNode) {
int32_t c = tsdbDataIterCmprFn(&pCompactor->pIter->rbtn, pNode);
if (c > 0) {
tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn);
pCompactor->pIter = NULL;
} else if (c == 0) {
ASSERT(0);
}
}
}
}
if (pCompactor->pIter == NULL) {
SRBTreeNode *pNode = tRBTreeDropMin(&pCompactor->rbt);
if (pNode) {
pCompactor->pIter = TSDB_RBTN_TO_DATA_ITER(pNode);
}
}
if (pCompactor->pIter) {
if (tsdbCompactTableIsDropped(pCompactor)) {
TABLEID tbid = {.suid = pCompactor->pIter->rowInfo.suid, .uid = pCompactor->pIter->rowInfo.uid};
tRBTreeClear(&pCompactor->rbt);
for (pCompactor->pIter = pCompactor->iterList; pCompactor->pIter; pCompactor->pIter = pCompactor->pIter->next) {
code = tsdbDataIterNext2(pCompactor->pIter,
&(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_TABLEID, .tbid = tbid});
TSDB_CHECK_CODE(code, lino, _exit);
if (pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid) {
tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn);
}
}
} else {
*ppRowInfo = &pCompactor->pIter->rowInfo;
break;
}
} else {
*ppRowInfo = NULL;
break;
}
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
tstrerror(code));
}
return code;
}
static int32_t tsdbCompactFileSetStart(STsdbCompactor *pCompactor, SDFileSet *pSet) {
int32_t code = 0;
int32_t lino = 0;
pCompactor->fid = pSet->fid;
pCompactor->tbid = (TABLEID){0};
/* tombstone */
pCompactor->iDelIdx = 0;
/* reader */
code = tsdbDataFReaderOpen(&pCompactor->pReader, pCompactor->pTsdb, pSet);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbOpenDataFileDataIter(pCompactor->pReader, &pCompactor->pIter);
TSDB_CHECK_CODE(code, lino, _exit);
tRBTreeCreate(&pCompactor->rbt, tsdbDataIterCmprFn);
if (pCompactor->pIter) {
pCompactor->pIter->next = pCompactor->iterList;
pCompactor->iterList = pCompactor->pIter;
code = tsdbDataIterNext2(pCompactor->pIter, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid);
tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn);
}
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
code = tsdbOpenSttFileDataIter(pCompactor->pReader, iStt, &pCompactor->pIter);
TSDB_CHECK_CODE(code, lino, _exit);
if (pCompactor->pIter) {
pCompactor->pIter->next = pCompactor->iterList;
pCompactor->iterList = pCompactor->pIter;
code = tsdbDataIterNext2(pCompactor->pIter, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid);
tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn);
}
}
pCompactor->pIter = NULL;
/* writer */
code = tsdbDataFWriterOpen(&pCompactor->pWriter, pCompactor->pTsdb,
&(SDFileSet){.fid = pCompactor->fid,
.diskId = pSet->diskId,
.pHeadF = &(SHeadFile){.commitID = pCompactor->commitID},
.pDataF = &(SDataFile){.commitID = pCompactor->commitID},
.pSmaF = &(SSmaFile){.commitID = pCompactor->commitID},
.nSttF = 1,
.aSttF = {&(SSttFile){.commitID = pCompactor->commitID}}});
TSDB_CHECK_CODE(code, lino, _exit);
if (pCompactor->aBlockIdx) {
taosArrayClear(pCompactor->aBlockIdx);
} else if ((pCompactor->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
tMapDataReset(&pCompactor->mDataBlk);
if (pCompactor->aSttBlk) {
taosArrayClear(pCompactor->aSttBlk);
} else if ((pCompactor->aSttBlk = taosArrayInit(0, sizeof(SSttBlk))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
tBlockDataReset(&pCompactor->bData);
tBlockDataReset(&pCompactor->sData);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
tstrerror(code), pCompactor->fid);
} else {
tsdbInfo("vgId:%d %s done, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->fid);
}
return code;
}
static int32_t tsdbCompactFileSetEnd(STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
ASSERT(pCompactor->bData.nRow == 0);
ASSERT(pCompactor->sData.nRow == 0);
/* update files */
code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbUpdateDFileSetHeader(pCompactor->pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbFSUpsertFSet(&pCompactor->fs, &pCompactor->pWriter->wSet);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDataFWriterClose(&pCompactor->pWriter, 1);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDataFReaderClose(&pCompactor->pReader);
TSDB_CHECK_CODE(code, lino, _exit);
/* do clear */
while ((pCompactor->pIter = pCompactor->iterList) != NULL) {
pCompactor->iterList = pCompactor->pIter->next;
tsdbCloseDataIter2(pCompactor->pIter);
}
tBlockDataReset(&pCompactor->bData);
tBlockDataReset(&pCompactor->sData);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
tstrerror(code), pCompactor->fid);
} else {
tsdbInfo("vgId:%d %s done, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->fid);
}
return code;
}
static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor, SDFileSet *pSet) {
int32_t code = 0;
int32_t lino = 0;
// start compact
code = tsdbCompactFileSetStart(pCompactor, pSet);
TSDB_CHECK_CODE(code, lino, _exit);
// do compact, end with a NULL row
SRowInfo *pRowInfo;
do {
code = tsdbCompactNextRow(pCompactor, &pRowInfo);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCompactWriteTableData(pCompactor, pRowInfo);
TSDB_CHECK_CODE(code, lino, _exit);
} while (pRowInfo);
// end compact
code = tsdbCompactFileSetEnd(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
tstrerror(code), pCompactor->fid);
if (pCompactor->pWriter) tsdbDataFWriterClose(&pCompactor->pWriter, 0);
while ((pCompactor->pIter = pCompactor->iterList)) {
pCompactor->iterList = pCompactor->pIter->next;
tsdbCloseDataIter2(pCompactor->pIter);
}
if (pCompactor->pReader) tsdbDataFReaderClose(&pCompactor->pReader);
}
return code;
}
static void tsdbEndCompact(STsdbCompactor *pCompactor) {
// writer
tBlockDataDestroy(&pCompactor->sData);
tBlockDataDestroy(&pCompactor->bData);
taosArrayDestroy(pCompactor->aSttBlk);
tMapDataClear(&pCompactor->mDataBlk);
taosArrayDestroy(pCompactor->aBlockIdx);
// reader
// tombstone
taosArrayDestroy(pCompactor->aSkyLine);
taosArrayDestroy(pCompactor->aDelData);
taosArrayDestroy(pCompactor->aDelIdx);
// others
tDestroyTSchema(pCompactor->tbSkm.pTSchema);
tsdbFSDestroy(&pCompactor->fs);
tsdbInfo("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->commitID);
}
static int32_t tsdbBeginCompact(STsdb *pTsdb, SCompactInfo *pInfo, STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
pCompactor->pTsdb = pTsdb;
pCompactor->commitID = pInfo->commitID;
pCompactor->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows;
pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows;
pCompactor->fid = INT32_MIN;
code = tsdbFSCopy(pTsdb, &pCompactor->fs);
TSDB_CHECK_CODE(code, lino, _exit);
/* tombstone */
if (pCompactor->fs.pDelFile) {
code = tsdbDelFReaderOpen(&pCompactor->pDelFReader, pCompactor->fs.pDelFile, pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
if ((pCompactor->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if ((pCompactor->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if ((pCompactor->aSkyLine = taosArrayInit(0, sizeof(TSDBKEY))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbReadDelIdx(pCompactor->pDelFReader, pCompactor->aDelIdx);
TSDB_CHECK_CODE(code, lino, _exit);
}
/* reader */
/* writer */
code = tBlockDataCreate(&pCompactor->bData);
TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataCreate(&pCompactor->sData);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s, commit ID:%" PRId64, TD_VID(pTsdb->pVnode), __func__, lino,
tstrerror(code), pCompactor->commitID);
tBlockDataDestroy(&pCompactor->sData);
tBlockDataDestroy(&pCompactor->bData);
if (pCompactor->fs.pDelFile) {
taosArrayDestroy(pCompactor->aSkyLine);
taosArrayDestroy(pCompactor->aDelData);
taosArrayDestroy(pCompactor->aDelIdx);
if (pCompactor->pDelFReader) tsdbDelFReaderClose(&pCompactor->pDelFReader);
}
tsdbFSDestroy(&pCompactor->fs);
} else {
tsdbInfo("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pTsdb->pVnode), __func__, pCompactor->commitID);
}
return code;
}
int32_t tsdbCompact(STsdb *pTsdb, SCompactInfo *pInfo) {
int32_t code = 0;
STsdbCompactor *pCompactor = &(STsdbCompactor){0};
if ((code = tsdbBeginCompact(pTsdb, pInfo, pCompactor))) return code;
for (;;) {
SDFileSet *pSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid},
tDFileSetCmprFn, TD_GT);
if (pSet == NULL) {
pCompactor->fid = INT32_MAX;
break;
}
if ((code = tsdbCompactFileSet(pCompactor, pSet))) goto _exit;
}
if ((code = tsdbFSUpsertDelFile(&pCompactor->fs, NULL))) goto _exit;
_exit:
if (code) {
tsdbAbortCompact(pCompactor);
} else {
tsdbFSPrepareCommit(pTsdb, &pCompactor->fs);
}
tsdbEndCompact(pCompactor);
return code;
}
int32_t tsdbCommitCompact(STsdb *pTsdb) {
int32_t code = 0;
int32_t lino = 0;
taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbFSCommit(pTsdb);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosThreadRwlockUnlock(&pTsdb->rwLock);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
return code;
}

View File

@ -404,7 +404,7 @@ void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
tsdbDebug("try next last file block:%d from %d, trigger by uid:%" PRIu64 ", file index:%d, %s", pIter->iSttBlk, tsdbDebug("try next last file block:%d from %d, trigger by uid:%" PRIu64 ", file index:%d, %s", pIter->iSttBlk,
oldIndex, pIter->uid, pIter->iStt, idStr); oldIndex, pIter->uid, pIter->iStt, idStr);
} else { } else {
tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index::%d, %s", pIter->uid, oldIndex, idStr); tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index:%d, %s", pIter->uid, oldIndex, idStr);
} }
} }

View File

@ -44,6 +44,7 @@ typedef struct SBlockIndex {
typedef struct STableBlockScanInfo { typedef struct STableBlockScanInfo {
uint64_t uid; uint64_t uid;
TSKEY lastKey; TSKEY lastKey;
TSKEY lastKeyInStt; // last accessed key in stt
SMapData mapData; // block info (compressed) SMapData mapData; // block info (compressed)
SArray* pBlockList; // block data index list, SArray<SBlockIndex> SArray* pBlockList; // block data index list, SArray<SBlockIndex>
SIterInfo iter; // mem buffer skip list iterator SIterInfo iter; // mem buffer skip list iterator
@ -192,7 +193,7 @@ static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbRe
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
SRowMerger* pMerger); SRowMerger* pMerger);
static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger, SVersionRange* pVerRange); SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
STsdbReader* pReader); STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
@ -402,9 +403,11 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf
if (ASCENDING_TRAVERSE(pTsdbReader->order)) { if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
int64_t skey = pTsdbReader->window.skey; int64_t skey = pTsdbReader->window.skey;
pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey; pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
pScanInfo->lastKeyInStt = skey;
} else { } else {
int64_t ekey = pTsdbReader->window.ekey; int64_t ekey = pTsdbReader->window.ekey;
pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pScanInfo->lastKeyInStt = ekey;
} }
taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES); taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
@ -1795,21 +1798,18 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
while (1) { while (1) {
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
if (!hasVal) { if (!hasVal) { // the next value will be the accessed key in stt
pScanInfo->lastKeyInStt += step;
return false; return false;
} }
TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
TSDBKEY k = TSDBROW_KEY(&row); TSDBKEY k = TSDBROW_KEY(&row);
if (hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) { pScanInfo->lastKeyInStt = k.ts;
pScanInfo->lastKey = k.ts;
} else { if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) {
// the qualifed ts may equal to k.ts, only a greater version one. // the qualifed ts may equal to k.ts, only a greater version one.
// here we need to fallback one step. // here we need to fallback one step.
if (pScanInfo->lastKey == k.ts) {
pScanInfo->lastKey -= step;
}
return true; return true;
} }
} }
@ -1949,7 +1949,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
} }
if (minKey == k.ts) { if (minKey == k.ts) {
@ -1997,7 +1997,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
} }
if (minKey == key) { if (minKey == key) {
@ -2050,7 +2050,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMerge(&merge, &fRow1); tsdbRowMerge(&merge, &fRow1);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr);
code = tsdbRowMergerGetRow(&merge, &pTSRow); code = tsdbRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -2068,7 +2068,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return code; return code;
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr);
// merge with block data if ts == key // merge with block data if ts == key
if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) { if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
@ -2121,7 +2121,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMerge(&merge, &fRow1); tsdbRowMerge(&merge, &fRow1);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange, pReader->idStr);
code = tsdbRowMergerGetRow(&merge, &pTSRow); code = tsdbRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -2230,7 +2230,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} }
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
} }
if (minKey == ik.ts) { if (minKey == ik.ts) {
@ -2321,7 +2321,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
} }
if (minKey == key) { if (minKey == key) {
@ -2472,9 +2472,9 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
STimeWindow w = pLBlockReader->window; STimeWindow w = pLBlockReader->window;
if (ASCENDING_TRAVERSE(pLBlockReader->order)) { if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
w.skey = pScanInfo->lastKey + step; w.skey = pScanInfo->lastKeyInStt;
} else { } else {
w.ekey = pScanInfo->lastKey + step; w.ekey = pScanInfo->lastKeyInStt;
} }
tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey, tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey,
@ -3512,6 +3512,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
CHECK_FILEBLOCK_STATE* state) { CHECK_FILEBLOCK_STATE* state) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
bool asc = ASCENDING_TRAVERSE(pReader->order);
*state = CHECK_FILEBLOCK_QUIT; *state = CHECK_FILEBLOCK_QUIT;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
@ -3522,7 +3523,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) { if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) {
pDumpInfo->rowIndex = pDumpInfo->rowIndex =
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) { if ((pDumpInfo->rowIndex >= pDumpInfo->totalRows && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
*state = CHECK_FILEBLOCK_CONT; *state = CHECK_FILEBLOCK_CONT;
} }
} }
@ -3567,13 +3568,16 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
} }
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger, SVersionRange* pVerRange) { SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) {
while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) { while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 == ts) { if (next1 == ts) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMerge(pMerger, &fRow1); tsdbRowMerge(pMerger, &fRow1);
} else { } else {
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
pScanInfo->lastBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
idStr);
break; break;
} }
} }
@ -3936,6 +3940,17 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n
pInfo->uid = pList[i].uid; pInfo->uid = pList[i].uid;
pUidList->tableUidList[i] = pList[i].uid; pUidList->tableUidList[i] = pList[i].uid;
// todo extract method
if (ASCENDING_TRAVERSE(pReader->order)) {
int64_t skey = pReader->window.skey;
pInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
pInfo->lastKeyInStt = skey;
} else {
int64_t ekey = pReader->window.ekey;
pInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pInfo->lastKeyInStt = ekey;
}
taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES); taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
} }
@ -4720,8 +4735,12 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
return code; return code;
} }
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) { static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows, int32_t numOfBucket) {
return (numOfRows - startRow) / bucketRange; int32_t bucketIndex = (numOfRows - startRow) / bucketRange;
if (bucketIndex == numOfBucket) {
bucketIndex -= 1;
}
return bucketIndex;
} }
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) { int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
@ -4730,8 +4749,9 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
pTableBlockInfo->totalRows = 0; pTableBlockInfo->totalRows = 0;
pTableBlockInfo->numOfVgroups = 1; pTableBlockInfo->numOfVgroups = 1;
// find the start data block in file const int32_t numOfBucket = 20.0;
// find the start data block in file
tsdbAcquireReader(pReader); tsdbAcquireReader(pReader);
if (pReader->suspended) { if (pReader->suspended) {
tsdbReaderResume(pReader); tsdbReaderResume(pReader);
@ -4742,7 +4762,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
pTableBlockInfo->defMinRows = pc->minRows; pTableBlockInfo->defMinRows = pc->minRows;
pTableBlockInfo->defMaxRows = pc->maxRows; pTableBlockInfo->defMaxRows = pc->maxRows;
int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0); int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / numOfBucket);
pTableBlockInfo->numOfFiles += 1; pTableBlockInfo->numOfFiles += 1;
@ -4780,7 +4800,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
pTableBlockInfo->totalSize += pBlock->aSubBlock[0].szBlock; pTableBlockInfo->totalSize += pBlock->aSubBlock[0].szBlock;
int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows); int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows, numOfBucket);
pTableBlockInfo->blockRowsHisto[bucketIndex]++; pTableBlockInfo->blockRowsHisto[bucketIndex]++;
hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr); hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);

View File

@ -1,120 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnd.h"
extern int32_t tsdbCommitCompact(STsdb *pTsdb);
static int32_t vnodeCompactTask(void *param) {
int32_t code = 0;
int32_t lino = 0;
SCompactInfo *pInfo = (SCompactInfo *)param;
SVnode *pVnode = pInfo->pVnode;
// do compact
code = tsdbCompact(pInfo->pVnode->pTsdb, pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
// end compact
char dir[TSDB_FILENAME_LEN] = {0};
if (pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
vnodeCommitInfo(dir);
tsdbCommitCompact(pVnode->pTsdb);
_exit:
tsem_post(&pInfo->pVnode->canCommit);
taosMemoryFree(pInfo);
return code;
}
static int32_t vnodePrepareCompact(SVnode *pVnode, SCompactInfo *pInfo) {
int32_t code = 0;
int32_t lino = 0;
tsem_wait(&pVnode->canCommit);
pInfo->pVnode = pVnode;
pInfo->flag = 0;
pInfo->commitID = ++pVnode->state.commitID;
char dir[TSDB_FILENAME_LEN] = {0};
SVnodeInfo info = {0};
if (pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
if (vnodeLoadInfo(dir, &info) < 0) {
code = terrno;
goto _exit;
}
info.state.commitID = pInfo->commitID;
if (vnodeSaveInfo(dir, &info) < 0) {
code = terrno;
goto _exit;
}
_exit:
if (code) {
vError("vgId:%d %s failed at line %d since %s, commit ID:%" PRId64, TD_VID(pVnode), __func__, lino, tstrerror(code),
pVnode->state.commitID);
} else {
vDebug("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pVnode), __func__, pVnode->state.commitID);
}
return code;
}
int32_t vnodeAsyncCompact(SVnode *pVnode) {
int32_t code = 0;
int32_t lino = 0;
SCompactInfo *pInfo = taosMemoryCalloc(1, sizeof(*pInfo));
if (pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
vnodeAsyncCommit(pVnode);
code = vnodePrepareCompact(pVnode, pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
vnodeScheduleTask(vnodeCompactTask, pInfo);
_exit:
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
if (pInfo) taosMemoryFree(pInfo);
} else {
vInfo("vgId:%d %s done", TD_VID(pVnode), __func__);
}
return code;
}
int32_t vnodeSyncCompact(SVnode *pVnode) {
vnodeAsyncCompact(pVnode);
tsem_wait(&pVnode->canCommit);
tsem_post(&pVnode->canCommit);
return 0;
}

View File

@ -599,16 +599,10 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq,
vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp); vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
// process // process
#if 0
code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp);
if (code) goto _exit;
code = smaDoRetention(pVnode->pSma, trimReq.timestamp);
if (code) goto _exit;
#else
vnodeAsyncRentention(pVnode, trimReq.timestamp); vnodeAsyncRentention(pVnode, trimReq.timestamp);
#endif tsem_wait(&pVnode->canCommit);
tsem_post(&pVnode->canCommit);
_exit: _exit:
return code; return code;
@ -633,18 +627,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
tqUpdateTbUidList(pVnode->pTq, tbUids, false); tqUpdateTbUidList(pVnode->pTq, tbUids, false);
} }
#if 0
// process
ret = tsdbDoRetention(pVnode->pTsdb, ttlReq.timestamp);
if (ret) goto end;
ret = smaDoRetention(pVnode->pSma, ttlReq.timestamp);
if (ret) goto end;
#else
vnodeAsyncRentention(pVnode, ttlReq.timestamp); vnodeAsyncRentention(pVnode, ttlReq.timestamp);
tsem_wait(&pVnode->canCommit);
tsem_post(&pVnode->canCommit);
#endif
end: end:
taosArrayDestroy(tbUids); taosArrayDestroy(tbUids);
@ -1679,17 +1662,14 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *p
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
extern int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SCompactVnodeReq req = {0}; return vnodeProcessCompactVnodeReqImpl(pVnode, version, pReq, len, pRsp);
if (tDeserializeSCompactVnodeReq(pReq, len, &req) != 0) { }
terrno = TSDB_CODE_INVALID_MSG;
return TSDB_CODE_INVALID_MSG;
}
vInfo("vgId:%d, compact msg will be processed, db:%s dbUid:%" PRId64 " compactStartTime:%" PRId64, TD_VID(pVnode),
req.db, req.dbUid, req.compactStartTime);
vnodeAsyncCompact(pVnode);
vnodeBegin(pVnode);
#ifndef TD_ENTERPRISE
int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
return 0; return 0;
} }
#endif

View File

@ -248,13 +248,13 @@ static const char* cacheModelStr(int8_t cacheModel) {
return TSDB_CACHE_MODEL_NONE_STR; return TSDB_CACHE_MODEL_NONE_STR;
} }
static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, SDbCfgInfo* pCfg) { static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbName, char* dbFName, SDbCfgInfo* pCfg) {
blockDataEnsureCapacity(pBlock, 1); blockDataEnsureCapacity(pBlock, 1);
pBlock->info.rows = 1; pBlock->info.rows = 1;
SColumnInfoData* pCol1 = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData* pCol1 = taosArrayGet(pBlock->pDataBlock, 0);
char buf1[SHOW_CREATE_DB_RESULT_FIELD1_LEN] = {0}; char buf1[SHOW_CREATE_DB_RESULT_FIELD1_LEN] = {0};
STR_TO_VARSTR(buf1, dbFName); STR_TO_VARSTR(buf1, dbName);
colDataSetVal(pCol1, 0, buf1, false); colDataSetVal(pCol1, 0, buf1, false);
SColumnInfoData* pCol2 = taosArrayGet(pBlock->pDataBlock, 1); SColumnInfoData* pCol2 = taosArrayGet(pBlock->pDataBlock, 1);
@ -277,16 +277,20 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, S
} }
char* retentions = buildRetension(pCfg->pRetensions); char* retentions = buildRetension(pCfg->pRetensions);
int32_t dbFNameLen = strlen(dbFName);
int32_t hashPrefix = (pCfg->hashPrefix > (dbFNameLen + 1)) ? (pCfg->hashPrefix - dbFNameLen - 1) : 0;
len += sprintf( len += sprintf(
buf2 + VARSTR_HEADER_SIZE, buf2 + VARSTR_HEADER_SIZE,
"CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm " "CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm "
"WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d STT_TRIGGER %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d " "WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d STT_TRIGGER %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
"WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d", "WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d TABLE_PREFIX %d TABLE_SUFFIX %d TSDB_PAGESIZE %d "
dbFName, pCfg->buffer, pCfg->cacheSize, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile, "WAL_RETENTION_PERIOD %d WAL_RETENTION_SIZE %" PRId64 " WAL_ROLL_PERIOD %d WAL_SEGMENT_SIZE %" PRId64,
dbName, pCfg->buffer, pCfg->cacheSize, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile,
pCfg->walFsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->sstTrigger, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->walFsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->sstTrigger, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2,
pCfg->pages, pCfg->pageSize, prec, pCfg->replications, pCfg->walLevel, pCfg->numOfVgroups, pCfg->pages, pCfg->pageSize, prec, pCfg->replications, pCfg->walLevel, pCfg->numOfVgroups,
1 == pCfg->numOfStables); 1 == pCfg->numOfStables, hashPrefix, pCfg->hashSuffix, pCfg->tsdbPageSize, pCfg->walRetentionPeriod,
pCfg->walRetentionSize, pCfg->walRollPeriod, pCfg->walSegmentSize);
if (retentions) { if (retentions) {
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions); len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions);
@ -404,7 +408,7 @@ static int32_t execShowCreateDatabase(SShowCreateDatabaseStmt* pStmt, SRetrieveT
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
int32_t code = buildCreateDBResultDataBlock(&pBlock); int32_t code = buildCreateDBResultDataBlock(&pBlock);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
setCreateDBResultIntoDataBlock(pBlock, pStmt->dbName, pStmt->pCfg); setCreateDBResultIntoDataBlock(pBlock, pStmt->dbName, pStmt->dbFName, pStmt->pCfg);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildRetrieveTableRsp(pBlock, SHOW_CREATE_DB_RESULT_COLS, pRsp); code = buildRetrieveTableRsp(pBlock, SHOW_CREATE_DB_RESULT_COLS, pRsp);

View File

@ -732,7 +732,7 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int3
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
extern void doDestroyExchangeOperatorInfo(void* param); extern void doDestroyExchangeOperatorInfo(void* param);

View File

@ -1444,16 +1444,21 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) { int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) {
// todo add more information about exchange operation // todo add more information about exchange operation
int32_t type = pOperator->operatorType; int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN ||
type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
*order = TSDB_ORDER_ASC; *order = TSDB_ORDER_ASC;
*scanFlag = MAIN_SCAN; *scanFlag = MAIN_SCAN;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
if (!inheritUsOrder) {
*order = TSDB_ORDER_ASC;
}
*scanFlag = MAIN_SCAN;
return TSDB_CODE_SUCCESS;
} else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pTableScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pOperator->info;
*order = pTableScanInfo->base.cond.order; *order = pTableScanInfo->base.cond.order;
@ -1468,7 +1473,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} else { } else {
return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag); return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag, inheritUsOrder);
} }
} }
} }
@ -1584,7 +1589,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
} }
hasValidBlock = true; hasValidBlock = true;
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); int32_t code = getTableScanInfo(pOperator, &order, &scanFlag, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock); destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);

View File

@ -69,7 +69,7 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN; int32_t scanFlag = MAIN_SCAN;
getTableScanInfo(pOperator, &order, &scanFlag); getTableScanInfo(pOperator, &order, &scanFlag, false);
int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey; int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey;
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo)); taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
@ -128,7 +128,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN; int32_t scanFlag = MAIN_SCAN;
getTableScanInfo(pOperator, &order, &scanFlag); getTableScanInfo(pOperator, &order, &scanFlag, false);
doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo); doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows > 0) { if (pResBlock->info.rows > 0) {

View File

@ -383,7 +383,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
break; break;
} }
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); int32_t code = getTableScanInfo(pOperator, &order, &scanFlag, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }

View File

@ -289,7 +289,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
int32_t code = getTableScanInfo(downstream, &order, &scanFlag); int32_t code = getTableScanInfo(downstream, &order, &scanFlag, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
@ -441,7 +441,7 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
int32_t code = getTableScanInfo(downstream, &order, &scanFlag); int32_t code = getTableScanInfo(downstream, &order, &scanFlag, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }

View File

@ -1072,7 +1072,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} }
getTableScanInfo(pOperator, &pInfo->inputOrder, &scanFlag); getTableScanInfo(pOperator, &pInfo->inputOrder, &scanFlag, true);
if (pInfo->scalarSupp.pExprInfo != NULL) { if (pInfo->scalarSupp.pExprInfo != NULL) {
SExprSupp* pExprSup = &pInfo->scalarSupp; SExprSupp* pExprSup = &pInfo->scalarSupp;
@ -4294,7 +4294,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
} }
} }
getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag); getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag, false);
setInputDataBlock(pSup, pBlock, pIaInfo->inputOrder, scanFlag, true); setInputDataBlock(pSup, pBlock, pIaInfo->inputOrder, scanFlag, true);
doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes); doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
@ -4621,7 +4621,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} }
getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag); getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag, false);
setInputDataBlock(pExpSupp, pBlock, iaInfo->inputOrder, scanFlag, true); setInputDataBlock(pExpSupp, pBlock, iaInfo->inputOrder, scanFlag, true);
doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);

View File

@ -1211,7 +1211,8 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD
} else { } else {
res = translateNormalValue(pCxt, pVal, targetDt, strict); res = translateNormalValue(pCxt, pVal, targetDt, strict);
} }
pVal->node.resType = targetDt; pVal->node.resType.type = targetDt.type;
pVal->node.resType.bytes = targetDt.bytes;
pVal->node.resType.scale = pVal->unit; pVal->node.resType.scale = pVal->unit;
pVal->translate = true; pVal->translate = true;
if (!strict && TSDB_DATA_TYPE_UBIGINT == pVal->node.resType.type && pVal->datum.u <= INT64_MAX) { if (!strict && TSDB_DATA_TYPE_UBIGINT == pVal->node.resType.type && pVal->datum.u <= INT64_MAX) {
@ -6477,6 +6478,11 @@ static int32_t translateShowCreateDatabase(STranslateContext* pCxt, SShowCreateD
if (NULL == pStmt->pCfg) { if (NULL == pStmt->pCfg) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
SName name;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
tNameGetFullDbName(&name, pStmt->dbFName);
return getDBCfg(pCxt, pStmt->dbName, (SDbCfgInfo*)pStmt->pCfg); return getDBCfg(pCxt, pStmt->dbName, (SDbCfgInfo*)pStmt->pCfg);
} }

View File

@ -915,6 +915,7 @@ int32_t vectorConvertSingleCol(SScalarParam *input, SScalarParam *output, int32_
SDataType t = {.type = type}; SDataType t = {.type = type};
t.bytes = IS_VAR_DATA_TYPE(t.type)? input->columnData->info.bytes:tDataTypes[type].bytes; t.bytes = IS_VAR_DATA_TYPE(t.type)? input->columnData->info.bytes:tDataTypes[type].bytes;
t.precision = input->columnData->info.precision;
int32_t code = sclCreateColumnInfoData(&t, input->numOfRows, output); int32_t code = sclCreateColumnInfoData(&t, input->numOfRows, output);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -278,6 +278,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
msg.contLen = tlen + sizeof(SMsgHead); msg.contLen = tlen + sizeof(SMsgHead);
msg.pCont = buf; msg.pCont = buf;
msg.msgType = TDMT_STREAM_RECOVER_FINISH; msg.msgType = TDMT_STREAM_RECOVER_FINISH;
msg.info.noResp = 1;
tmsgSendReq(pEpSet, &msg); tmsgSendReq(pEpSet, &msg);
@ -522,4 +523,3 @@ FREE:
taosFreeQitem(pBlock); taosFreeQitem(pBlock);
return code; return code;
} }

View File

@ -1173,7 +1173,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
addr.sin_port = (uint16_t)htons(pList->port); addr.sin_port = (uint16_t)htons(pList->port);
tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst); tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst);
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4); int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10);
if (fd == -1) { if (fd == -1) {
tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
tstrerror(TAOS_SYSTEM_ERROR(errno))); tstrerror(TAOS_SYSTEM_ERROR(errno)));

View File

@ -677,14 +677,21 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
SServerObj* pObj = container_of(stream, SServerObj, server); SServerObj* pObj = container_of(stream, SServerObj, server);
uv_tcp_t* cli = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); uv_tcp_t* cli = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
uv_tcp_init(pObj->loop, cli); if (cli == NULL) return;
if (uv_accept(stream, (uv_stream_t*)cli) == 0) { int err = uv_tcp_init(pObj->loop, cli);
if (err != 0) {
tError("failed to create tcp: %s", uv_err_name(err));
taosMemoryFree(cli);
return;
}
err = uv_accept(stream, (uv_stream_t*)cli);
if (err == 0) {
#if defined(WINDOWS) || defined(DARWIN) #if defined(WINDOWS) || defined(DARWIN)
if (pObj->numOfWorkerReady < pObj->numOfThreads) { if (pObj->numOfWorkerReady < pObj->numOfThreads) {
tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads,
pObj->numOfWorkerReady); pObj->numOfWorkerReady);
uv_close((uv_handle_t*)cli, NULL); uv_close((uv_handle_t*)cli, uvFreeCb);
return; return;
} }
#endif #endif
@ -700,8 +707,10 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb); uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
} else { } else {
if (!uv_is_closing((uv_handle_t*)cli)) { if (!uv_is_closing((uv_handle_t*)cli)) {
uv_close((uv_handle_t*)cli, NULL); tError("failed to accept tcp: %s", uv_err_name(err));
uv_close((uv_handle_t*)cli, uvFreeCb);
} else { } else {
tError("failed to accept tcp: %s", uv_err_name(err));
taosMemoryFree(cli); taosMemoryFree(cli);
} }
} }

View File

@ -114,27 +114,28 @@ static void *taosProcessAlarmSignal(void *tharg) {
taosThreadCleanupPush(taosDeleteTimer, &timerId); taosThreadCleanupPush(taosDeleteTimer, &timerId);
struct itimerspec ts; do {
ts.it_value.tv_sec = 0; struct itimerspec ts;
ts.it_value.tv_nsec = 1000000 * MSECONDS_PER_TICK; ts.it_value.tv_sec = 0;
ts.it_interval.tv_sec = 0; ts.it_value.tv_nsec = 1000000 * MSECONDS_PER_TICK;
ts.it_interval.tv_nsec = 1000000 * MSECONDS_PER_TICK; ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 1000000 * MSECONDS_PER_TICK;
if (timer_settime(timerId, 0, &ts, NULL)) { if (timer_settime(timerId, 0, &ts, NULL)) {
// printf("Failed to init timer"); // printf("Failed to init timer");
return NULL; break;
}
int signo;
while (!stopTimer) {
if (sigwait(&sigset, &signo)) {
// printf("Failed to wait signal: number %d", signo);
continue;
} }
/* //printf("Signal handling: number %d ......\n", signo); */
callback(0); int signo;
} while (!stopTimer) {
if (sigwait(&sigset, &signo)) {
// printf("Failed to wait signal: number %d", signo);
continue;
}
/* //printf("Signal handling: number %d ......\n", signo); */
callback(0);
}
} while (0);
taosThreadCleanupPop(1); taosThreadCleanupPop(1);

View File

@ -94,6 +94,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk")
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found") TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space") TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space")
TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout") TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DISKSPACE, "No enough disk space")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")

View File

@ -42,17 +42,17 @@ class TDTestCase:
tdSql.query('show create database scd;') tdSql.query('show create database scd;')
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.checkData(0, 0, 'scd') tdSql.checkData(0, 0, 'scd')
tdSql.checkData(0, 1, "CREATE DATABASE `scd` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 1 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0") tdSql.checkData(0, 1, "CREATE DATABASE `scd` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 1 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0 TABLE_PREFIX 0 TABLE_SUFFIX 0 TSDB_PAGESIZE 4 WAL_RETENTION_PERIOD 0 WAL_RETENTION_SIZE 0 WAL_ROLL_PERIOD 0 WAL_SEGMENT_SIZE 0")
tdSql.query('show create database scd2;') tdSql.query('show create database scd2;')
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.checkData(0, 0, 'scd2') tdSql.checkData(0, 0, 'scd2')
tdSql.checkData(0, 1, "CREATE DATABASE `scd2` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 3 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0") tdSql.checkData(0, 1, "CREATE DATABASE `scd2` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 3 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0 TABLE_PREFIX 0 TABLE_SUFFIX 0 TSDB_PAGESIZE 4 WAL_RETENTION_PERIOD 0 WAL_RETENTION_SIZE 0 WAL_ROLL_PERIOD 0 WAL_SEGMENT_SIZE 0")
tdSql.query('show create database scd4') tdSql.query('show create database scd4')
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.checkData(0, 0, 'scd4') tdSql.checkData(0, 0, 'scd4')
tdSql.checkData(0, 1, "CREATE DATABASE `scd4` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 13 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0") tdSql.checkData(0, 1, "CREATE DATABASE `scd4` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 13 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0 TABLE_PREFIX 0 TABLE_SUFFIX 0 TSDB_PAGESIZE 4 WAL_RETENTION_PERIOD 0 WAL_RETENTION_SIZE 0 WAL_ROLL_PERIOD 0 WAL_SEGMENT_SIZE 0")
self.restartTaosd(1, dbname='scd') self.restartTaosd(1, dbname='scd')
@ -60,17 +60,17 @@ class TDTestCase:
tdSql.query('show create database scd;') tdSql.query('show create database scd;')
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.checkData(0, 0, 'scd') tdSql.checkData(0, 0, 'scd')
tdSql.checkData(0, 1, "CREATE DATABASE `scd` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 1 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0") tdSql.checkData(0, 1, "CREATE DATABASE `scd` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 1 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0 TABLE_PREFIX 0 TABLE_SUFFIX 0 TSDB_PAGESIZE 4 WAL_RETENTION_PERIOD 0 WAL_RETENTION_SIZE 0 WAL_ROLL_PERIOD 0 WAL_SEGMENT_SIZE 0")
tdSql.query('show create database scd2;') tdSql.query('show create database scd2;')
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.checkData(0, 0, 'scd2') tdSql.checkData(0, 0, 'scd2')
tdSql.checkData(0, 1, "CREATE DATABASE `scd2` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 3 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0") tdSql.checkData(0, 1, "CREATE DATABASE `scd2` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 3 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0 TABLE_PREFIX 0 TABLE_SUFFIX 0 TSDB_PAGESIZE 4 WAL_RETENTION_PERIOD 0 WAL_RETENTION_SIZE 0 WAL_ROLL_PERIOD 0 WAL_SEGMENT_SIZE 0")
tdSql.query('show create database scd4') tdSql.query('show create database scd4')
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.checkData(0, 0, 'scd4') tdSql.checkData(0, 0, 'scd4')
tdSql.checkData(0, 1, "CREATE DATABASE `scd4` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 13 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0") tdSql.checkData(0, 1, "CREATE DATABASE `scd4` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 13 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0 TABLE_PREFIX 0 TABLE_SUFFIX 0 TSDB_PAGESIZE 4 WAL_RETENTION_PERIOD 0 WAL_RETENTION_SIZE 0 WAL_ROLL_PERIOD 0 WAL_SEGMENT_SIZE 0")
tdSql.execute('drop database scd') tdSql.execute('drop database scd')

View File

@ -632,6 +632,8 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/projectionDesc.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/projectionDesc.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/update_data.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/update_data.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/tb_100w_data_order.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/tb_100w_data_order.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py
@ -655,6 +657,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/csum.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/csum.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_diff.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_diff.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tagFilter.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tagFilter.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/projectionDesc.py
,,n,system-test,python3 ./test.py -f 2-query/queryQnode.py ,,n,system-test,python3 ./test.py -f 2-query/queryQnode.py
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode1mnode.py ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode1mnode.py
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5
@ -680,8 +683,8 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 6 -M 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 6 -M 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3 -n 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3
@ -738,7 +741,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUpdateWithConsume.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUpdateWithConsume.py -N 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot1.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot1.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDelete-1ctb.py # ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDelete-1ctb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDelete-multiCtb.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDelete-multiCtb.py -N 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStbCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStbCtb.py
@ -854,6 +857,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/projectionDesc.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distinct.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distinct.py -Q 3
@ -945,13 +949,14 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/projectionDesc.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distinct.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distinct.py -Q 4
@ -1070,6 +1075,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_data.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_data.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_data.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_data.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/projectionDesc.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/odbc.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/odbc.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-20582.py ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-20582.py

View File

@ -45,6 +45,8 @@ class TDSimClient:
"supportVnodes": "1024", "supportVnodes": "1024",
"enableQueryHb": "1", "enableQueryHb": "1",
"telemetryReporting": "0", "telemetryReporting": "0",
"tqDebugflag": "135",
"wDebugflag":"135",
} }
def getLogDir(self): def getLogDir(self):

View File

@ -0,0 +1,136 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o demo demo.c -ltaos
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taos.h" // TAOS header file
static void queryDB(TAOS *taos, char *command) {
int i;
TAOS_RES *pSql = NULL;
int32_t code = -1;
for (i = 0; i < 5; i++) {
if (NULL != pSql) {
taos_free_result(pSql);
pSql = NULL;
}
pSql = taos_query(taos, command);
code = taos_errno(pSql);
if (0 == code) {
break;
}
}
if (code != 0) {
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos);
exit(EXIT_FAILURE);
}
taos_free_result(pSql);
}
void Test(TAOS *taos, char *qstr);
int main(int argc, char *argv[]) {
char qstr[1024];
// connect to server
if (argc < 2) {
printf("please input server-ip \n");
return 0;
}
TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
if (taos == NULL) {
printf("failed to connect to server, reason:%s\n", "null taos"/*taos_errstr(taos)*/);
exit(1);
}
Test(taos, qstr);
taos_close(taos);
taos_cleanup();
}
void Test(TAOS *taos, char *qstr) {
queryDB(taos, "drop database if exists demo");
queryDB(taos, "create database demo vgroups 1 minrows 10");
TAOS_RES *result;
queryDB(taos, "use demo");
queryDB(taos, "create table m1 (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10))");
printf("success to create table\n");
int i = 0;
for (int32_t n = 0; n < 10; ++n) {
for (i = 0; i < 10; ++i) {
int32_t v = n * 10 + i;
sprintf(qstr, "insert into m1 values (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')", (uint64_t)1546300800000, v, v, v, v*10000000, v*1.0, v*2.0, "hello");
printf("qstr: %s\n", qstr);
TAOS_RES *result1 = taos_query(taos, qstr);
if (result1 == NULL || taos_errno(result1) != 0) {
printf("failed to insert row, reason:%s\n", taos_errstr(result1));
taos_free_result(result1);
exit(1);
} else {
printf("insert row: %i\n", v);
}
taos_free_result(result1);
strcpy(qstr, "flush database demo");
result1 = taos_query(taos, qstr);
if (result1 == NULL || taos_errno(result1) != 0) {
printf("failed to fluash database, reason:%s\n", taos_errstr(result1));
taos_free_result(result1);
exit(1);
}
taos_free_result(result1);
}
}
// query the records
sprintf(qstr, "SELECT * FROM m1 order by ts desc");
result = taos_query(taos, qstr);
if (result == NULL || taos_errno(result) != 0) {
printf("failed to select, reason:%s\n", taos_errstr(result));
taos_free_result(result);
exit(1);
}
TAOS_ROW row;
int rows = 0;
int num_fields = taos_field_count(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
printf("num_fields = %d\n", num_fields);
printf("select * from table order by ts desc, result:\n");
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
char temp[1024] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
printf("%s\n", temp);
}
taos_free_result(result);
}

View File

@ -14,6 +14,7 @@ exe:
gcc $(CFLAGS) ./batchprepare.c -o $(ROOT)batchprepare $(LFLAGS) gcc $(CFLAGS) ./batchprepare.c -o $(ROOT)batchprepare $(LFLAGS)
gcc $(CFLAGS) ./stopquery.c -o $(ROOT)stopquery $(LFLAGS) gcc $(CFLAGS) ./stopquery.c -o $(ROOT)stopquery $(LFLAGS)
gcc $(CFLAGS) ./dbTableRoute.c -o $(ROOT)dbTableRoute $(LFLAGS) gcc $(CFLAGS) ./dbTableRoute.c -o $(ROOT)dbTableRoute $(LFLAGS)
gcc $(CFLAGS) ./insertSameTs.c -o $(ROOT)insertSameTs $(LFLAGS)
clean: clean:
rm $(ROOT)batchprepare rm $(ROOT)batchprepare

View File

@ -49,9 +49,23 @@ while $i < $tbNum
$nchar = $nchar . $c $nchar = $nchar . $c
$nchar = $nchar . ' $nchar = $nchar . '
sql insert into $tb values ($tstart , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar ) $next = $tstart + 30
$tstart = $tstart + 30 $f = $x + 1
$x = $x + 1 $c1 = $f / 100
$c1 = $c1 * 100
$c1 = $f - $c1
$binary1 = ' . binary
$binary1 = $binary1 . $c1
$binary1 = $binary1 . '
$nchar1 = ' . nchar
$nchar1 = $nchar1 . $c1
$nchar1 = $nchar1 . '
sql insert into $tb values ($tstart , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar ) ($next , $c1 , $c1 , $c1 , $c1 , $c1 , $c1 , $c1 , $binary1 , $nchar1 )
$tstart = $tstart + 60
$x = $x + 2
endw endw
$i = $i + 1 $i = $i + 1

View File

@ -48,24 +48,35 @@ sleep 100
#=================================================================== #===================================================================
print =============== query data from child table print =============== query data from child table
$loop_count = 0
loop0:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select `_wstart`,`min(k)`,`max(k)`,sum_alias from outstb sql select `_wstart`,`min(k)`,`max(k)`,sum_alias from outstb
print rows: $rows print rows: $rows
print $data00 $data01 $data02 $data03 print $data00 $data01 $data02 $data03
if $rows != 1 then if $rows != 1 then
return -1 goto loop0
endi endi
if $data01 != 234 then if $data01 != 234 then
return -1 goto loop0
endi endi
if $data02 != 234 then if $data02 != 234 then
return -1 goto loop0
endi endi
if $data03 != 234 then if $data03 != 234 then
print expect 234, actual $data03 print expect 234, actual $data03
return -1 goto loop0
endi endi
#=================================================================== #===================================================================
@ -77,23 +88,34 @@ sleep 100
#=================================================================== #===================================================================
print =============== query data from child table print =============== query data from child table
$loop_count = 0
loop1:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select `_wstart`,`min(k)`,`max(k)`,sum_alias from outstb sql select `_wstart`,`min(k)`,`max(k)`,sum_alias from outstb
print rows: $rows print rows: $rows
print $data00 $data01 $data02 $data03 print $data00 $data01 $data02 $data03
if $rows != 1 then if $rows != 1 then
return -1 goto loop1
endi endi
if $data01 != -111 then if $data01 != -111 then
return -1 goto loop1
endi endi
if $data02 != 234 then if $data02 != 234 then
return -1 goto loop1
endi endi
if $data03 != 123 then if $data03 != 123 then
return -1 goto loop1
endi endi
#=================================================================== #===================================================================
@ -105,36 +127,46 @@ sleep 100
#=================================================================== #===================================================================
print =============== query data from child table print =============== query data from child table
$loop_count = 0
loop2:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select `_wstart`,`min(k)`,`max(k)`,sum_alias from outstb sql select `_wstart`,`min(k)`,`max(k)`,sum_alias from outstb
print rows: $rows print rows: $rows
print $data00 $data01 $data02 $data03 print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13 print $data10 $data11 $data12 $data13
if $rows != 2 then if $rows != 2 then
return -1 goto loop2
endi endi
if $data01 != -111 then if $data01 != -111 then
return -1 goto loop2
endi endi
if $data02 != 234 then if $data02 != 234 then
return -1 goto loop2
endi endi
if $data03 != 123 then if $data03 != 123 then
return -1 goto loop2
endi endi
if $data11 != 789 then if $data11 != 789 then
return -1 goto loop2
endi endi
if $data12 != 789 then if $data12 != 789 then
return -1 goto loop2
endi endi
if $data13 != 789 then if $data13 != 789 then
return -1 goto loop2
endi endi
_OVER: _OVER:

View File

@ -12,13 +12,16 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import re
from util.log import * from util.log import *
from util.cases import * from util.cases import *
from util.sql import * from util.sql import *
from util.common import * from util.common import *
from util.sqlset import * from util.sqlset import *
class TDTestCase: class TDTestCase:
def init(self, conn, logSql, replicaVar=1): def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar) self.replicaVar = int(replicaVar)
@ -28,7 +31,7 @@ class TDTestCase:
self.ins_param_list = ['dnodes','mnodes','qnodes','cluster','functions','users','grants','topics','subscriptions','streams'] self.ins_param_list = ['dnodes','mnodes','qnodes','cluster','functions','users','grants','topics','subscriptions','streams']
self.perf_param = ['apps','connections','consumers','queries','transactions'] self.perf_param = ['apps','connections','consumers','queries','transactions']
self.perf_param_list = ['apps','connections','consumers','queries','trans'] self.perf_param_list = ['apps','connections','consumers','queries','trans']
def ins_check(self): def ins_check(self):
tdSql.prepare() tdSql.prepare()
for param in self.ins_param_list: for param in self.ins_param_list:
@ -113,7 +116,25 @@ class TDTestCase:
query_result = tdSql.queryResult query_result = tdSql.queryResult
tdSql.checkEqual(query_result[0][1].lower(),sql) tdSql.checkEqual(query_result[0][1].lower(),sql)
tdSql.execute('drop database db') tdSql.execute('drop database db')
def check_gitinfo(self):
taosd_gitinfo_sql = ''
tdSql.query('show dnode 1 variables')
for i in tdSql.queryResult:
if i[1].lower() == "gitinfo":
taosd_gitinfo_sql = f"gitinfo: {i[2]}"
taos_gitinfo_sql = ''
tdSql.query('show local variables')
for i in tdSql.queryResult:
if i[0].lower() == "gitinfo":
taos_gitinfo_sql = f"gitinfo: {i[1]}"
taos_info = os.popen('taos -V').read()
taos_gitinfo = re.findall("^gitinfo.*",taos_info,re.M)
tdSql.checkEqual(taos_gitinfo_sql,taos_gitinfo[0])
taosd_info = os.popen('taosd -V').read()
taosd_gitinfo = re.findall("^gitinfo.*",taosd_info,re.M)
tdSql.checkEqual(taosd_gitinfo_sql,taosd_gitinfo[0])
def run(self): def run(self):
self.check_gitinfo()
self.ins_check() self.ins_check()
self.perf_check() self.perf_check()
self.show_sql() self.show_sql()

View File

@ -0,0 +1,47 @@
from wsgiref.headers import tspecials
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.rowNum = 10
self.batchNum = 5
self.ts = 1537146000000
def run(self):
dbname = "db"
tdSql.prepare()
intData = []
floatData = []
tdSql.execute(f'''create table {dbname}.stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))''')
tdSql.execute(f"create table {dbname}.stb_1 using {dbname}.stb tags('beijing')")
for n in range(self.batchNum):
for i in range(self.rowNum):
tdSql.execute(f"insert into {dbname}.stb_1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
intData.append(i + 1)
floatData.append(i + 0.1)
tdSql.execute(f"flush database {dbname}")
tdSql.query(f"select * from {dbname}.stb_1 order by ts desc")
tdSql.checkRows(1)
#tdSql.checkData(0,0,1537146000000)
tdSql.checkData(0,1,10)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -143,7 +143,8 @@ class TDTestCase:
stableName= '%s_%d'%(paraDict['stbName'],i) stableName= '%s_%d'%(paraDict['stbName'],i)
newTdSql=tdCom.newTdSql() newTdSql=tdCom.newTdSql()
threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]))) threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])))
threads.append(threading.Thread(target=self.reCreateUser,args=(newTdSql,i,"user","passwd"))) createTdSql=tdCom.newTdSql()
threads.append(threading.Thread(target=self.reCreateUser,args=(createTdSql,i,"user","passwd")))
for tr in threads: for tr in threads:
tr.start() tr.start()

View File

@ -109,7 +109,7 @@ class TDTestCase:
'batchNum': 3000, 'batchNum': 3000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'endTs': 0, 'endTs': 0,
'pollDelay': 5, 'pollDelay': 15,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 0} 'snapshot': 0}
@ -194,7 +194,7 @@ class TDTestCase:
'rowsPerTbl': 10000, 'rowsPerTbl': 10000,
'batchNum': 5000, 'batchNum': 5000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5, 'pollDelay': 15,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 0} 'snapshot': 0}
@ -296,7 +296,7 @@ class TDTestCase:
'rowsPerTbl': 10000, 'rowsPerTbl': 10000,
'batchNum': 5000, 'batchNum': 5000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5, 'pollDelay': 15,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 0} 'snapshot': 0}

View File

@ -1048,26 +1048,28 @@ void *shellThreadLoop(void *arg) {
taosGetOldTerminalMode(); taosGetOldTerminalMode();
taosThreadCleanupPush(shellCleanup, NULL); taosThreadCleanupPush(shellCleanup, NULL);
char *command = taosMemoryMalloc(SHELL_MAX_COMMAND_SIZE);
if (command == NULL) {
printf("failed to malloc command\r\n");
return NULL;
}
do { do {
memset(command, 0, SHELL_MAX_COMMAND_SIZE); char *command = taosMemoryMalloc(SHELL_MAX_COMMAND_SIZE);
taosSetTerminalMode(); if (command == NULL) {
printf("failed to malloc command\r\n");
if (shellReadCommand(command) != 0) {
break; break;
} }
taosResetTerminalMode(); do {
} while (shellRunCommand(command, true) == 0); memset(command, 0, SHELL_MAX_COMMAND_SIZE);
taosSetTerminalMode();
taosMemoryFreeClear(command); if (shellReadCommand(command) != 0) {
shellWriteHistory(); break;
shellExit(); }
taosResetTerminalMode();
} while (shellRunCommand(command, true) == 0);
taosMemoryFreeClear(command);
shellWriteHistory();
shellExit();
} while (0);
taosThreadCleanupPop(1); taosThreadCleanupPop(1);
return NULL; return NULL;