Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TD-26174

This commit is contained in:
liuyao 2023-10-07 19:13:07 +08:00
commit c088ee7f91
35 changed files with 384 additions and 205 deletions

View File

@ -195,6 +195,20 @@ if (TD_LINUX)
ELSE()
set(TD_DEPS_DIR "x86")
ENDIF()
elseif (TD_DARWIN)
IF (TD_ARM_64 OR TD_ARM_32)
set(TD_DEPS_DIR "darwin/arm")
ELSE ()
set(TD_DEPS_DIR "darwin/x64")
ENDIF ()
elseif (TD_WINDOWS)
IF (TD_WINDOWS_64)
set(TD_DEPS_DIR "win/x64")
ELSEIF (TD_WINDOWS_32)
set(TD_DEPS_DIR "win/i386")
ENDIF ()
else ()
MESSAGE(FATAL_ERROR "unsupported platform")
endif()
MESSAGE(STATUS "DEPS_DIR: " ${TD_DEPS_DIR})

View File

@ -1,6 +1,7 @@
# curl
ExternalProject_Add(curl
URL https://curl.se/download/curl-8.2.1.tar.gz
URL_HASH MD5=b25588a43556068be05e1624e0e74d41
DOWNLOAD_NO_PROGRESS 1
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
#GIT_REPOSITORY https://github.com/curl/curl.git

BIN
deps/arm/dm_static/libdmodule.a vendored Normal file

Binary file not shown.

BIN
deps/darwin/arm/dm_static/libdmodule.a vendored Normal file

Binary file not shown.

BIN
deps/darwin/x64/dm_static/libdmodule.a vendored Normal file

Binary file not shown.

BIN
deps/win/x64/dm_static/dmodule.lib vendored Normal file

Binary file not shown.

BIN
deps/x86/dm_static/libdmodule.a vendored Normal file

Binary file not shown.

View File

@ -33,6 +33,17 @@ int tsem_timewait(tsem_t *sim, int64_t milis);
int tsem_post(tsem_t *sem);
int tsem_destroy(tsem_t *sem);
#elif defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#include <windows.h>
#define tsem_t HANDLE
int tsem_init(tsem_t *sem, int pshared, unsigned int value);
int tsem_wait(tsem_t *sem);
int tsem_timewait(tsem_t *sim, int64_t milis);
int tsem_post(tsem_t *sem);
int tsem_destroy(tsem_t *sem);
#else
#define tsem_t sem_t

View File

@ -1557,11 +1557,13 @@ STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version) {
// SColData ========================================
void tColDataDestroy(void *ph) {
SColData *pColData = (SColData *)ph;
if (ph) {
SColData *pColData = (SColData *)ph;
tFree(pColData->pBitMap);
tFree(pColData->aOffset);
tFree(pColData->pData);
tFree(pColData->pBitMap);
tFree(pColData->aOffset);
tFree(pColData->pData);
}
}
void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn) {

View File

@ -3,11 +3,22 @@ add_library(mgmt_dnode STATIC ${MGMT_DNODE})
if (DEFINED GRANT_CFG_INCLUDE_DIR)
add_definitions(-DGRANTS_CFG)
endif()
IF (NOT BUILD_DM_MODULE)
MESSAGE(STATUS "NOT BUILD_DM_MODULE")
target_link_directories(
mgmt_dnode
PUBLIC "${TD_SOURCE_DIR}/deps/${TD_DEPS_DIR}/dm_static"
)
ELSE()
MESSAGE(STATUS "BUILD_DM_MODULE")
ENDIF()
target_include_directories(
mgmt_dnode
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc"
PUBLIC "${GRANT_CFG_INCLUDE_DIR}"
)
target_link_libraries(
mgmt_dnode node_util
mgmt_dnode node_util dmodule
)

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "dmInt.h"
#include "libs/function/tudf.h"
static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
if (dmStartStatusThread(pMgmt) != 0) {

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "qmInt.h"
#include "libs/function/tudf.h"
static int32_t qmRequire(const SMgmtInputOpt *pInput, bool *required) {
return dmReadFile(pInput->path, pInput->name, required);

View File

@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "smInt.h"
#include "libs/function/function.h"
#include "libs/function/tudf.h"
static int32_t smRequire(const SMgmtInputOpt *pInput, bool *required) {
return dmReadFile(pInput->path, pInput->name, required);

View File

@ -17,6 +17,7 @@
#include "vmInt.h"
#include "tfs.h"
#include "vnd.h"
#include "libs/function/tudf.h"
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
STfs *pTfs = pMgmt->pTfs;

View File

@ -95,6 +95,10 @@ void dmCleanupDnode(SDnode *pDnode);
SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType nType);
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper);
void dmReleaseWrapper(SMgmtWrapper *pWrapper);
int32_t dmInitVars(SDnode *pDnode);
void dmClearVars(SDnode *pDnode);
int32_t dmInitModule(SDnode *pDnode);
bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper);
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper);
void dmSetStatus(SDnode *pDnode, EDndRunStatus stype);
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg);

View File

@ -16,24 +16,7 @@
#define _DEFAULT_SOURCE
#include "dmMgmt.h"
#include "audit.h"
#define STR_CASE_CMP(s, d) (0 == strcasecmp((s), (d)))
#define STR_STR_CMP(s, d) (strstr((s), (d)))
#define STR_INT_CMP(s, d, c) (taosStr2Int32(s, 0, 10) c(d))
#define STR_STR_SIGN ("ia")
#define DM_INIT_MON() \
do { \
code = (int32_t)(2147483648 | 298); \
strncpy(stName, tsVersionName, 64); \
monCfg.maxLogs = tsMonitorMaxLogs; \
monCfg.port = tsMonitorPort; \
monCfg.server = tsMonitorFqdn; \
monCfg.comp = tsMonitorComp; \
if (monInit(&monCfg) != 0) { \
if (terrno != 0) code = terrno; \
goto _exit; \
} \
} while (0)
#include "libs/function/tudf.h"
#define DM_INIT_AUDIT() \
do { \
@ -45,15 +28,7 @@
} \
} while (0)
#define DM_ERR_RTN(c) \
do { \
code = (c); \
goto _exit; \
} while (0)
static SDnode globalDnode = {0};
static const char *dmOS[10] = {"Ubuntu", "CentOS Linux", "Red Hat", "Debian GNU", "CoreOS",
"FreeBSD", "openSUSE", "SLES", "Fedora", "macOS"};
SDnode *dmInstance() { return &globalDnode; }
@ -76,30 +51,14 @@ static int32_t dmInitSystem() {
static int32_t dmInitMonitor() {
int32_t code = 0;
SMonCfg monCfg = {0};
char reName[64] = {0};
char stName[64] = {0};
char ver[64] = {0};
DM_INIT_MON();
if (STR_STR_CMP(stName, STR_STR_SIGN)) {
DM_ERR_RTN(0);
}
if (taosGetOsReleaseName(reName, stName, ver, 64) != 0) {
DM_ERR_RTN(code);
}
if (STR_CASE_CMP(stName, dmOS[0])) {
if (STR_INT_CMP(ver, 17, >)) {
DM_ERR_RTN(0);
}
} else if (STR_CASE_CMP(stName, dmOS[1])) {
if (STR_INT_CMP(ver, 6, >)) {
DM_ERR_RTN(0);
}
} else if (STR_STR_CMP(stName, dmOS[2]) || STR_STR_CMP(stName, dmOS[3]) || STR_STR_CMP(stName, dmOS[4]) ||
STR_STR_CMP(stName, dmOS[5]) || STR_STR_CMP(stName, dmOS[6]) || STR_STR_CMP(stName, dmOS[7]) ||
STR_STR_CMP(stName, dmOS[8]) || STR_STR_CMP(stName, dmOS[9])) {
DM_ERR_RTN(0);
monCfg.maxLogs = tsMonitorMaxLogs;
monCfg.port = tsMonitorPort;
monCfg.server = tsMonitorFqdn;
monCfg.comp = tsMonitorComp;
if (monInit(&monCfg) != 0) {
if (terrno != 0) code = terrno;
goto _exit;
}
_exit:

View File

@ -24,88 +24,6 @@
#include "tcompression.h"
#endif
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
bool required = false;
int32_t code = (*pWrapper->func.requiredFp)(&input, &required);
if (!required) {
dDebug("node:%s, does not require startup", pWrapper->name);
} else {
dDebug("node:%s, required to startup", pWrapper->name);
}
return required;
}
static int32_t dmInitVars(SDnode *pDnode) {
SDnodeData *pData = &pDnode->data;
pData->dnodeId = 0;
pData->clusterId = 0;
pData->dnodeVer = 0;
pData->updateTime = 0;
pData->rebootTime = taosGetTimestampMs();
pData->dropped = 0;
pData->stopped = 0;
pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pData->dnodeHash == NULL) {
dError("failed to init dnode hash");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dmReadEps(pData) != 0) {
dError("failed to read file since %s", terrstr());
return -1;
}
if (pData->dropped) {
dError("dnode will not start since its already dropped");
return -1;
}
taosThreadRwlockInit(&pData->lock, NULL);
taosThreadMutexInit(&pDnode->mutex, NULL);
return 0;
}
static void dmClearVars(SDnode *pDnode) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
taosMemoryFreeClear(pWrapper->path);
taosThreadRwlockDestroy(&pWrapper->lock);
}
if (pDnode->lockfile != NULL) {
taosUnLockFile(pDnode->lockfile);
taosCloseFile(&pDnode->lockfile);
pDnode->lockfile = NULL;
}
SDnodeData *pData = &pDnode->data;
taosThreadRwlockWrlock(&pData->lock);
if (pData->oldDnodeEps != NULL) {
if (dmWriteEps(pData) == 0) {
dmRemoveDnodePairs(pData);
}
taosArrayDestroy(pData->oldDnodeEps);
pData->oldDnodeEps = NULL;
}
if (pData->dnodeEps != NULL) {
taosArrayDestroy(pData->dnodeEps);
pData->dnodeEps = NULL;
}
if (pData->dnodeHash != NULL) {
taosHashCleanup(pData->dnodeHash);
pData->dnodeHash = NULL;
}
taosThreadRwlockUnlock(&pData->lock);
taosThreadRwlockDestroy(&pData->lock);
taosThreadMutexDestroy(&pDnode->mutex);
memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
}
int32_t dmInitDnode(SDnode *pDnode) {
dDebug("start to create dnode");
int32_t code = -1;
@ -143,22 +61,12 @@ int32_t dmInitDnode(SDnode *pDnode) {
pWrapper->required = dmRequireNode(pDnode, pWrapper);
}
if (dmInitMsgHandle(pDnode) != 0) {
dError("failed to init msg handles since %s", terrstr());
goto _OVER;
}
pDnode->lockfile = dmCheckRunning(tsDataDir);
if (pDnode->lockfile == NULL) {
goto _OVER;
}
if (dmInitServer(pDnode) != 0) {
dError("failed to init transport since %s", terrstr());
goto _OVER;
}
if (dmInitClient(pDnode) != 0) {
if(dmInitModule(pDnode) != 0) {
goto _OVER;
}

View File

@ -40,7 +40,6 @@
#include "tfs.h"
#include "wal.h"
#include "libs/function/tudf.h"
#ifdef __cplusplus
extern "C" {
#endif
@ -94,6 +93,7 @@ typedef int32_t (*ProcessAlterNodeTypeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
typedef struct {
int32_t dnodeId;
int32_t engineVer;
int64_t clusterId;
int64_t dnodeVer;
int64_t updateTime;
@ -172,6 +172,9 @@ int32_t dmReadFile(const char *path, const char *name, bool *pDeployed);
int32_t dmWriteFile(const char *path, const char *name, bool deployed);
TdFilePtr dmCheckRunning(const char *dataDir);
// dmodule.c
int32_t dmInitDndInfo(SDnodeData *pData);
// dmEps.c
int32_t dmReadEps(SDnodeData *pData);
int32_t dmWriteEps(SDnodeData *pData);

View File

@ -57,6 +57,8 @@ static int32_t dmDecodeEps(SJson *pJson, SDnodeData *pData) {
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "dnodeVer", pData->dnodeVer, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "engineVer", pData->engineVer, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "clusterId", pData->clusterId, code);
if (code < 0) return -1;
tjsonGetInt32ValueFromDouble(pJson, "dropped", pData->dropped, code);
@ -96,7 +98,8 @@ int32_t dmReadEps(SDnodeData *pData) {
pData->dnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
if (pData->dnodeEps == NULL) {
dError("failed to calloc dnodeEp array since %s", strerror(errno));
code = terrno;
dError("failed to calloc dnodeEp array since %s", terrstr());
goto _OVER;
}
@ -184,6 +187,7 @@ _OVER:
static int32_t dmEncodeEps(SJson *pJson, SDnodeData *pData) {
if (tjsonAddDoubleToObject(pJson, "dnodeId", pData->dnodeId) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "dnodeVer", pData->dnodeVer) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "engineVer", pData->engineVer) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "clusterId", pData->clusterId) < 0) return -1;
if (tjsonAddDoubleToObject(pJson, "dropped", pData->dropped) < 0) return -1;
@ -218,8 +222,11 @@ int32_t dmWriteEps(SDnodeData *pData) {
snprintf(realfile, sizeof(realfile), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
terrno = TSDB_CODE_OUT_OF_MEMORY;
if((code == dmInitDndInfo(pData)) != 0) goto _OVER;
pJson = tjsonCreateObject();
if (pJson == NULL) goto _OVER;
pData->engineVer = tsVersion;
if (dmEncodeEps(pJson, pData) != 0) goto _OVER;
buffer = tjsonToString(pJson);
if (buffer == NULL) goto _OVER;

View File

@ -47,7 +47,7 @@ void mndUserFreeObj(SUserObj *pUser);
int64_t mndGetIpWhiteVer(SMnode *pMnode);
void mndUpdateIpWhite(SMnode *pMnode, char *user, char *fqdn, int8_t type, int8_t lock);
void mndUpdateIpWhiteForAllUser(SMnode *pMnode, char *user, char *fqdn, int8_t type, int8_t lock);
int32_t mndRefreshUserIpWhiteList(SMnode *pMnode);

View File

@ -294,6 +294,7 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
pOld->cfg.daysToKeep0 = pNew->cfg.daysToKeep0;
pOld->cfg.daysToKeep1 = pNew->cfg.daysToKeep1;
pOld->cfg.daysToKeep2 = pNew->cfg.daysToKeep2;
pOld->cfg.keepTimeOffset = pNew->cfg.keepTimeOffset;
pOld->cfg.walFsyncPeriod = pNew->cfg.walFsyncPeriod;
pOld->cfg.walLevel = pNew->cfg.walLevel;
pOld->cfg.walRetentionPeriod = pNew->cfg.walRetentionPeriod;

View File

@ -147,7 +147,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
mndUpdateIpWhite(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD, 1);
mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD, 1);
_OVER:
mndTransDrop(pTrans);
@ -752,7 +752,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
mndUpdateIpWhite(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD, 1);
mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, dnodeObj.fqdn, IP_WHITE_ADD, 1);
_OVER:
mndTransDrop(pTrans);
sdbFreeRaw(pRaw);
@ -1045,7 +1045,7 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SM
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
mndUpdateIpWhite(pMnode, TSDB_DEFAULT_USER, pDnode->fqdn, IP_WHITE_DROP, 1);
mndUpdateIpWhiteForAllUser(pMnode, TSDB_DEFAULT_USER, pDnode->fqdn, IP_WHITE_DROP, 1);
code = 0;
_OVER:
@ -1191,7 +1191,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
strcpy(dcfgReq.config, "monitor");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
} else if (strncasecmp(cfgReq.config, "ttlpushinterval", 14) == 0) {
} else if (strncasecmp(cfgReq.config, "ttlpushinterval", 14) == 0) {
int32_t optLen = strlen("ttlpushinterval");
int32_t flag = -1;
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);

View File

@ -58,6 +58,7 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBloc
static void mndCancelGetNextPrivileges(SMnode *pMnode, void *pIter);
SHashObj *mndFetchAllIpWhite(SMnode *pMnode);
static int32_t mndProcesSRetrieveIpWhiteReq(SRpcMsg *pReq);
bool mndUpdateIpWhiteImpl(SHashObj *pIpWhiteTab, char *user, char *fqdn, int8_t type);
void ipWhiteMgtUpdateAll(SMnode *pMnode);
typedef struct {
@ -80,7 +81,7 @@ void ipWhiteMgtCleanup() {
taosThreadRwlockDestroy(&ipWhiteMgt.rw);
}
int32_t ipWhiteMgtUpdate(char *user, SIpWhiteList *pNew) {
int32_t ipWhiteMgtUpdate(SMnode *pMnode, char *user, SIpWhiteList *pNew) {
bool update = true;
taosThreadRwlockWrlock(&ipWhiteMgt.rw);
SIpWhiteList **ppList = taosHashGet(ipWhiteMgt.pIpWhiteTab, user, strlen(user));
@ -98,6 +99,25 @@ int32_t ipWhiteMgtUpdate(char *user, SIpWhiteList *pNew) {
taosHashPut(ipWhiteMgt.pIpWhiteTab, user, strlen(user), &p, sizeof(void *));
}
}
SArray *fqdns = mndGetAllDnodeFqdns(pMnode);
for (int i = 0; i < taosArrayGetSize(fqdns); i++) {
char *fqdn = taosArrayGetP(fqdns, i);
update |= mndUpdateIpWhiteImpl(ipWhiteMgt.pIpWhiteTab, TSDB_DEFAULT_USER, fqdn, IP_WHITE_ADD);
update |= mndUpdateIpWhiteImpl(ipWhiteMgt.pIpWhiteTab, user, fqdn, IP_WHITE_ADD);
}
for (int i = 0; i < taosArrayGetSize(fqdns); i++) {
char *fqdn = taosArrayGetP(fqdns, i);
taosMemoryFree(fqdn);
}
taosArrayDestroy(fqdns);
// for (int i = 0; i < taosArrayGetSize(pUserNames); i++) {
// taosMemoryFree(taosArrayGetP(pUserNames, i));
// }
// taosArrayDestroy(pUserNames);
if (update) ipWhiteMgt.ver++;
taosThreadRwlockUnlock(&ipWhiteMgt.rw);
@ -282,7 +302,7 @@ int32_t mndRefreshUserIpWhiteList(SMnode *pMnode) {
return 0;
}
void mndUpdateIpWhite(SMnode *pMnode, char *user, char *fqdn, int8_t type, int8_t lock) {
void mndUpdateIpWhiteForAllUser(SMnode *pMnode, char *user, char *fqdn, int8_t type, int8_t lock) {
if (lock) {
taosThreadRwlockWrlock(&ipWhiteMgt.rw);
if (ipWhiteMgt.ver == 0) {
@ -293,6 +313,20 @@ void mndUpdateIpWhite(SMnode *pMnode, char *user, char *fqdn, int8_t type, int8_
}
bool update = mndUpdateIpWhiteImpl(ipWhiteMgt.pIpWhiteTab, user, fqdn, type);
void *pIter = taosHashIterate(ipWhiteMgt.pIpWhiteTab, NULL);
while (pIter) {
size_t klen = 0;
char *key = taosHashGetKey(pIter, &klen);
char *keyDup = taosMemoryCalloc(1, klen + 1);
memcpy(keyDup, key, klen);
update |= mndUpdateIpWhiteImpl(ipWhiteMgt.pIpWhiteTab, keyDup, fqdn, type);
taosMemoryFree(keyDup);
pIter = taosHashIterate(ipWhiteMgt.pIpWhiteTab, pIter);
}
if (update) ipWhiteMgt.ver++;
if (lock) taosThreadRwlockUnlock(&ipWhiteMgt.rw);
@ -1178,7 +1212,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
mndTransDrop(pTrans);
goto _OVER;
}
ipWhiteMgtUpdate(userObj.user, userObj.pIpWhiteList);
ipWhiteMgtUpdate(pMnode, userObj.user, userObj.pIpWhiteList);
taosMemoryFree(userObj.pIpWhiteList);
mndTransDrop(pTrans);
@ -1346,7 +1380,7 @@ static int32_t mndAlterUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SRpc
mndTransDrop(pTrans);
return -1;
}
ipWhiteMgtUpdate(pNew->user, pNew->pIpWhiteList);
ipWhiteMgtUpdate(pMnode, pNew->user, pNew->pIpWhiteList);
mndTransDrop(pTrans);
return 0;
}

View File

@ -57,7 +57,7 @@ static int32_t tsdbOpenBCache(STsdb *pTsdb) {
// SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5);
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
SLRUCache *pCache = taosLRUCacheInit(tsS3BlockCacheSize * tsS3BlockSize * szPage, 0, .5);
SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * tsS3BlockSize * szPage, 0, .5);
if (pCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
@ -3100,7 +3100,7 @@ int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle)
taosThreadMutexUnlock(&pTsdb->bMutex);
*handle = NULL;
if (!pBlock) {
if (code == TSDB_CODE_SUCCESS && !pBlock) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
return code;

View File

@ -4882,6 +4882,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
// alloc
STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
if (pSnap == NULL) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
@ -4891,6 +4892,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
pSnap->pMem = pTsdb->mem;
pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode));
if (pSnap->pNode == NULL) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
@ -4905,6 +4907,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
pSnap->pIMem = pTsdb->imem;
pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode));
if (pSnap->pINode == NULL) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
@ -4915,6 +4918,9 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
tsdbRefMemTable(pTsdb->imem, pSnap->pINode);
}
// unlock
taosThreadRwlockUnlock(&pTsdb->rwLock);
// fs
code = tsdbFSCreateRefSnapshot(pTsdb->pFS, &pSnap->pfSetArray);
if (code == TSDB_CODE_SUCCESS) {
@ -4922,8 +4928,6 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
}
_exit:
taosThreadRwlockUnlock(&pTsdb->rwLock);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("vgId:%d take read snapshot failed, code:%s", TD_VID(pTsdb->pVnode), tstrerror(code));

View File

@ -63,7 +63,7 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
}
// not check file size when reading data files.
if (flag != TD_FILE_READ) {
if (flag != TD_FILE_READ && !pFD->s3File) {
if (taosStatFile(path, &pFD->szFile, NULL, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
// taosMemoryFree(pFD->pBuf);
@ -130,6 +130,9 @@ static int32_t tsdbWriteFilePage(STsdbFD *pFD) {
}
}
if (pFD->s3File) {
return code;
}
if (pFD->pgno > 0) {
int64_t n = taosLSeekFile(pFD->pFD, PAGE_OFFSET(pFD->pgno, pFD->szPage), SEEK_SET);
if (n < 0) {
@ -172,10 +175,10 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
LRUHandle *handle = NULL;
pFD->blkno = (pgno + tsS3BlockSize - 1) / tsS3BlockSize;
int32_t code = tsdbCacheGetBlockS3(pFD->pTsdb->bCache, pFD, &handle);
code = tsdbCacheGetBlockS3(pFD->pTsdb->bCache, pFD, &handle);
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
tsdbBCacheRelease(pFD->pTsdb->bCache, handle);
if (!handle) {
if (code == TSDB_CODE_SUCCESS && !handle) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
goto _exit;
@ -282,6 +285,9 @@ _exit:
int32_t tsdbFsyncFile(STsdbFD *pFD) {
int32_t code = 0;
if (pFD->s3File) {
return code;
}
code = tsdbWriteFilePage(pFD);
if (code) goto _exit;

View File

@ -111,15 +111,14 @@ int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str) {
clt_params = cos_create_resumable_clt_params_content(p, 1024 * 1024, 8, COS_FALSE, NULL);
s = cos_resumable_upload_file(options, &bucket, &object, &file, headers, NULL, clt_params, NULL, &resp_headers, NULL);
log_status(s);
if (!cos_status_is_ok(s)) {
vError("s3: %s", s->error_msg);
vError("s3: %d(%s)", s->code, s->error_msg);
vError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
code = terrno;
code = TAOS_SYSTEM_ERROR(EIO);
return code;
}
log_status(s);
cos_pool_destroy(p);
if (s->code != 200) {
@ -303,7 +302,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_
s = cos_get_object_to_buffer(options, &bucket, &object, headers, NULL, &download_buffer, &resp_headers);
log_status(s);
if (!cos_status_is_ok(s)) {
vError("s3: %s", s->error_msg);
vError("s3: %d(%s)", s->code, s->error_msg);
vError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
code = TAOS_SYSTEM_ERROR(EIO);
return code;

View File

@ -8018,9 +8018,29 @@ static int32_t insertCondIntoSelectStmt(SSelectStmt* pSelect, SNode* pCond) {
if (pSelect->pWhere == NULL) {
pSelect->pWhere = pCond;
} else {
SNode* pWhere = NULL;
createLogicCondNode(pSelect->pWhere, pCond, &pWhere, LOGIC_COND_TYPE_AND);
pSelect->pWhere = pWhere;
SNodeList* pLogicCondListWhere = NULL;
SNodeList* pLogicCondList2 = NULL;
if (nodeType(pSelect->pWhere) == QUERY_NODE_LOGIC_CONDITION &&
((SLogicConditionNode*)pSelect->pWhere)->condType == LOGIC_COND_TYPE_AND) {
pLogicCondListWhere = ((SLogicConditionNode*)pSelect->pWhere)->pParameterList;
} else {
nodesListMakeAppend(&pLogicCondListWhere, pSelect->pWhere);
}
if (nodeType(pCond) == QUERY_NODE_LOGIC_CONDITION &&
((SLogicConditionNode*)pCond)->condType == LOGIC_COND_TYPE_AND) {
pLogicCondList2 = ((SLogicConditionNode*)pCond)->pParameterList;
} else {
nodesListMakeAppend(&pLogicCondList2, pCond);
}
nodesListAppendList(pLogicCondListWhere, pLogicCondList2);
SLogicConditionNode* pWhere = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
pWhere->condType = LOGIC_COND_TYPE_AND;
pWhere->pParameterList = pLogicCondListWhere;
pSelect->pWhere = (SNode*)pWhere;
}
return TSDB_CODE_SUCCESS;
}

View File

@ -68,37 +68,34 @@ int32_t taosGetAppName(char* name, int32_t* len) {
}
int32_t tsem_wait(tsem_t* sem) {
int ret = 0;
do {
ret = sem_wait(sem);
} while (ret != 0 && errno == EINTR);
return ret;
return WaitForSingleObject(*sem, INFINITE);
}
int32_t tsem_timewait(tsem_t* sem, int64_t ms) {
struct timespec ts;
taosClockGetTime(0, &ts);
int32_t tsem_timewait(tsem_t* sem, int64_t timeout_ms) {
DWORD result = WaitForSingleObject(*sem, timeout_ms);
if (result == WAIT_OBJECT_0) {
return 0; // Semaphore acquired
} else if (result == WAIT_TIMEOUT) {
return -1; // Timeout reached
} else {
return result;
}
}
ts.tv_nsec += ms * 1000000;
ts.tv_sec += ts.tv_nsec / 1000000000;
ts.tv_nsec %= 1000000000;
int rc;
while ((rc = sem_timedwait(sem, &ts)) == -1 && errno == EINTR) continue;
return rc;
/* This should have timed out */
// ASSERT(errno == ETIMEDOUT);
// ASSERT(rc != 0);
// GetSystemTimeAsFileTime(&ft_after);
// // We specified a non-zero wait. Time must advance.
// if (ft_before.dwLowDateTime == ft_after.dwLowDateTime && ft_before.dwHighDateTime == ft_after.dwHighDateTime)
// {
// printf("nanoseconds: %d, rc: %d, code:0x%x. before filetime: %d, %d; after filetime: %d, %d\n",
// nanosecs, rc, errno,
// (int)ft_before.dwLowDateTime, (int)ft_before.dwHighDateTime,
// (int)ft_after.dwLowDateTime, (int)ft_after.dwHighDateTime);
// printf("time must advance during sem_timedwait.");
// return 1;
// }
// Inter-process sharing is not currently supported. The pshared parameter is invalid.
int tsem_init(tsem_t* sem, int pshared, unsigned int value) {
*sem = CreateSemaphore(NULL, value, LONG_MAX, NULL);
return (*sem != NULL) ? 0 : -1;
}
int tsem_post(tsem_t* sem) {
if (ReleaseSemaphore(*sem, 1, NULL)) return 0;
return -1;
}
int tsem_destroy(tsem_t* sem) {
if (CloseHandle(*sem)) return 0;
return -1;
}
#elif defined(_TD_DARWIN_64)
@ -133,8 +130,7 @@ int tsem_wait(tsem_t *psem) {
int tsem_timewait(tsem_t *psem, int64_t milis) {
if (psem == NULL || *psem == NULL) return -1;
dispatch_time_t time = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(milis * USEC_PER_SEC));
dispatch_semaphore_wait(*psem, time);
return 0;
return dispatch_semaphore_wait(*psem, time);
}
bool taosCheckPthreadValid(TdThread thread) { return thread != 0; }

View File

@ -77,4 +77,11 @@ target_link_libraries(osAtomicTests os util gtest_main)
add_test(
NAME osAtomicTests
COMMAND osAtomicTests
)
)
add_executable(osSemaphoreTests "osSemaphoreTests.cpp")
target_link_libraries(osSemaphoreTests os util gtest_main)
add_test(
NAME osSemaphoreTests
COMMAND osSemaphoreTests
)

View File

@ -0,0 +1,108 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <xsren@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <thread>
#include "os.h"
#include "tlog.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wformat"
#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
#pragma GCC diagnostic ignored "-Wpointer-arith"
TEST(osSemaphoreTests, InitAndDestroy) {
tsem_t sem;
int result = tsem_init(&sem, 0, 1);
EXPECT_EQ(result, 0);
result = tsem_destroy(&sem);
EXPECT_EQ(result, 0);
}
TEST(osSemaphoreTests, Destroy) {
tsem_t sem;
int result = tsem_init(&sem, 0, 1);
EXPECT_EQ(result, 0);
result = tsem_destroy(&sem);
EXPECT_EQ(result, 0);
// result = tsem_destroy(&sem);
// EXPECT_NE(result, 0); // result == 0 if on mac
}
// skip, tsem_wait can not stopped, will block test.
// TEST(osSemaphoreTests, Wait) {
// tsem_t sem;
// tsem_init(&sem, 0, 0);
// ASSERT_EQ(tsem_wait(&sem), -1);
// tsem_destroy(&sem);
// }
TEST(osSemaphoreTests, WaitTime0) {
tsem_t sem;
tsem_init(&sem, 0, 0);
EXPECT_NE(tsem_timewait(&sem, 1000), 0);
tsem_destroy(&sem);
}
TEST(osSemaphoreTests, WaitTime1) {
tsem_t sem;
tsem_init(&sem, 0, 1);
EXPECT_EQ(tsem_timewait(&sem, 1000), 0);
EXPECT_NE(tsem_timewait(&sem, 1000), 0);
tsem_destroy(&sem);
}
TEST(osSemaphoreTests, WaitAndPost) {
tsem_t sem;
int result = tsem_init(&sem, 0, 0);
EXPECT_EQ(result, 0);
std::thread([&sem]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
tsem_post(&sem);
}).detach();
result = tsem_wait(&sem);
EXPECT_EQ(result, 0);
result = tsem_destroy(&sem);
EXPECT_EQ(result, 0);
}
TEST(osSemaphoreTests, TimedWait) {
tsem_t sem;
int result = tsem_init(&sem, 0, 0);
EXPECT_EQ(result, 0);
std::thread([&sem]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
tsem_post(&sem);
}).detach();
result = tsem_timewait(&sem, 1000);
EXPECT_EQ(result, 0);
result = tsem_destroy(&sem);
EXPECT_EQ(result, 0);
}

View File

@ -815,6 +815,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-20582.py
,,n,system-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/insertMix.py -N 3
,,n,system-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/stt.py -N 3
,,n,system-test,python3 ./test.py -f eco-system/meta/database/keep_time_offset.py
#tsim test
,,y,script,./test.sh -f tsim/tmq/basic2Of2ConsOverlap.sim

View File

@ -94,4 +94,14 @@ print $rows
if $rows != 5 then
return -1
endi
sql show child db2.tables like '%'
print $rows
if $rows != 5 then
return -1
endi
sql show normal db2.tables like '%'
print $rows
if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,66 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import re
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.setsql = TDSetSql()
def create_db(self):
hours = 8
# create
keep_str = f"KEEP_TIME_OFFSET {hours}"
tdSql.execute(f"create database db {keep_str}")
# check result
tdSql.query("select `keep_time_offset` from information_schema.ins_databases where name='db'")
tdSql.checkData(0, 0, hours)
# alter
hours = 4
keep_str = f"KEEP_TIME_OFFSET {hours}"
tdSql.execute(f"alter database db {keep_str}")
# check result
tdSql.query("select `keep_time_offset` from information_schema.ins_databases where name='db'")
tdSql.checkData(0, 0, hours)
def check_old_syntax(self):
# old syntax would not support again
tdSql.error("alter dnode 1 'keeptimeoffset 10';")
def run(self):
# check new syntax right
self.create_db()
# check old syntax error
self.check_old_syntax()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -125,7 +125,10 @@ void build_code(HuffmanTree *huffmanTree, node n, int len, unsigned long out1, u
huffmanTree->code[n->c] = (unsigned long*)malloc(2*sizeof(unsigned long));
if(len<=64)
{
(huffmanTree->code[n->c])[0] = out1 << (64 - len);
if(len == 0)
(huffmanTree->code[n->c])[0] = 0;
else
(huffmanTree->code[n->c])[0] = out1 << (64 - len);
(huffmanTree->code[n->c])[1] = out2;
}
else