Merge pull request #18809 from taosdata/enh/insertOptimize_wxy
enh: merge getmetafromcache and getvgroupfromcache interface
This commit is contained in:
commit
0c120d3380
|
@ -62,6 +62,7 @@ int32_t taosRealPath(char *dirname, char *realPath, int32_t maxlen);
|
||||||
bool taosIsDir(const char *dirname);
|
bool taosIsDir(const char *dirname);
|
||||||
char *taosDirName(char *dirname);
|
char *taosDirName(char *dirname);
|
||||||
char *taosDirEntryBaseName(char *dirname);
|
char *taosDirEntryBaseName(char *dirname);
|
||||||
|
void taosGetCwd(char *buf, int32_t len);
|
||||||
|
|
||||||
TdDirPtr taosOpenDir(const char *dirname);
|
TdDirPtr taosOpenDir(const char *dirname);
|
||||||
TdDirEntryPtr taosReadDir(TdDirPtr pDir);
|
TdDirEntryPtr taosReadDir(TdDirPtr pDir);
|
||||||
|
|
|
@ -62,8 +62,11 @@ void taosResetTerminalMode();
|
||||||
taosMemoryFree(strings); \
|
taosMemoryFree(strings); \
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
#define taosPrintTrace(flags, level, dflag) \
|
#define taosPrintTrace(flags, level, dflag) \
|
||||||
{}
|
{ \
|
||||||
|
taosPrintLog(flags, level, dflag, \
|
||||||
|
"backtrace not implemented on windows, so detailed stack information cannot be printed"); \
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -307,8 +307,9 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_MIN_DURATION_PER_FILE 60 // unit minute
|
#define TSDB_MIN_DURATION_PER_FILE 60 // unit minute
|
||||||
#define TSDB_MAX_DURATION_PER_FILE (3650 * 1440)
|
#define TSDB_MAX_DURATION_PER_FILE (3650 * 1440)
|
||||||
#define TSDB_DEFAULT_DURATION_PER_FILE (10 * 1440)
|
#define TSDB_DEFAULT_DURATION_PER_FILE (10 * 1440)
|
||||||
#define TSDB_MIN_KEEP (1 * 1440) // data in db to be reserved. unit minute
|
#define TSDB_MIN_KEEP (1 * 1440) // data in db to be reserved. unit minute
|
||||||
#define TSDB_MAX_KEEP (365000 * 1440) // data in db to be reserved.
|
#define TSDB_MAX_KEEP (365000 * 1440) // data in db to be reserved.
|
||||||
|
#define TSDB_MAX_KEEP_NS (365 * 292 * 1440) // data in db to be reserved.
|
||||||
#define TSDB_DEFAULT_KEEP (3650 * 1440) // ten years
|
#define TSDB_DEFAULT_KEEP (3650 * 1440) // ten years
|
||||||
#define TSDB_MIN_MINROWS_FBLOCK 10
|
#define TSDB_MIN_MINROWS_FBLOCK 10
|
||||||
#define TSDB_MAX_MINROWS_FBLOCK 1000
|
#define TSDB_MAX_MINROWS_FBLOCK 1000
|
||||||
|
|
|
@ -83,8 +83,8 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
|
||||||
#endif
|
#endif
|
||||||
;
|
;
|
||||||
|
|
||||||
bool taosAssertLog(bool condition, const char *file, int32_t line, const char *format, ...);
|
bool taosAssert(bool condition, const char *file, int32_t line, const char *format, ...);
|
||||||
#define ASSERTS(condition, ...) taosAssertLog(condition, __FILE__, __LINE__, __VA_ARGS__)
|
#define ASSERTS(condition, ...) taosAssert(condition, __FILE__, __LINE__, __VA_ARGS__)
|
||||||
#define ASSERT(condition) ASSERTS(condition, "assert info not provided")
|
#define ASSERT(condition) ASSERTS(condition, "assert info not provided")
|
||||||
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
|
|
|
@ -875,6 +875,7 @@ void tmqFreeImpl(void* handle) {
|
||||||
tmq_t* tmq = (tmq_t*)handle;
|
tmq_t* tmq = (tmq_t*)handle;
|
||||||
|
|
||||||
// TODO stop timer
|
// TODO stop timer
|
||||||
|
tmqClearUnhandleMsg(tmq);
|
||||||
if (tmq->mqueue) taosCloseQueue(tmq->mqueue);
|
if (tmq->mqueue) taosCloseQueue(tmq->mqueue);
|
||||||
if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
|
if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
|
||||||
if (tmq->qall) taosFreeQall(tmq->qall);
|
if (tmq->qall) taosFreeQall(tmq->qall);
|
||||||
|
@ -884,8 +885,7 @@ void tmqFreeImpl(void* handle) {
|
||||||
int32_t sz = taosArrayGetSize(tmq->clientTopics);
|
int32_t sz = taosArrayGetSize(tmq->clientTopics);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
|
taosMemoryFreeClear(pTopic->schema.pSchema);
|
||||||
int32_t vgSz = taosArrayGetSize(pTopic->vgs);
|
|
||||||
taosArrayDestroy(pTopic->vgs);
|
taosArrayDestroy(pTopic->vgs);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(tmq->clientTopics);
|
taosArrayDestroy(tmq->clientTopics);
|
||||||
|
@ -1304,7 +1304,6 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
|
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
|
||||||
int32_t vgSz = taosArrayGetSize(pTopic->vgs);
|
|
||||||
taosArrayDestroy(pTopic->vgs);
|
taosArrayDestroy(pTopic->vgs);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(tmq->clientTopics);
|
taosArrayDestroy(tmq->clientTopics);
|
||||||
|
@ -1410,7 +1409,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
void* pReq = taosMemoryCalloc(1, tlen);
|
void* pReq = taosMemoryCalloc(1, tlen);
|
||||||
if (tlen < 0) {
|
if (pReq == NULL) {
|
||||||
tscError("failed to malloc askEpReq msg, size:%d", tlen);
|
tscError("failed to malloc askEpReq msg, size:%d", tlen);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1738,7 +1737,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
return pRsp;
|
return pRsp;
|
||||||
} else {
|
} else {
|
||||||
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
|
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "dmMgmt.h"
|
#include "dmMgmt.h"
|
||||||
#include "mnode.h"
|
#include "mnode.h"
|
||||||
#include "tconfig.h"
|
#include "tconfig.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
#define DM_APOLLO_URL "The apollo string to use when configuring the server, such as: -a 'jsonFile:./tests/cfg.json', cfg.json text can be '{\"fqdn\":\"td1\"}'."
|
#define DM_APOLLO_URL "The apollo string to use when configuring the server, such as: -a 'jsonFile:./tests/cfg.json', cfg.json text can be '{\"fqdn\":\"td1\"}'."
|
||||||
|
@ -45,9 +46,30 @@ static struct {
|
||||||
SArray *pArgs; // SConfigPair
|
SArray *pArgs; // SConfigPair
|
||||||
} global = {0};
|
} global = {0};
|
||||||
|
|
||||||
static void dmStopDnode(int signum, void *info, void *ctx) { dmStop(); }
|
static void dmSetDebugFlag(int32_t signum, void *sigInfo, void *context) { taosSetAllDebugFlag(143, true); }
|
||||||
|
static void dmSetAssert(int32_t signum, void *sigInfo, void *context) { tsAssert = 1; }
|
||||||
|
|
||||||
|
static void dmStopDnode(int signum, void *sigInfo, void *context) {
|
||||||
|
// taosIgnSignal(SIGUSR1);
|
||||||
|
// taosIgnSignal(SIGUSR2);
|
||||||
|
taosIgnSignal(SIGTERM);
|
||||||
|
taosIgnSignal(SIGHUP);
|
||||||
|
taosIgnSignal(SIGINT);
|
||||||
|
taosIgnSignal(SIGABRT);
|
||||||
|
taosIgnSignal(SIGBREAK);
|
||||||
|
|
||||||
|
dInfo("shut down signal is %d", signum);
|
||||||
|
#ifndef WINDOWS
|
||||||
|
dInfo("sender PID:%d cmdline:%s", ((siginfo_t *)sigInfo)->si_pid,
|
||||||
|
taosGetCmdlineByPID(((siginfo_t *)sigInfo)->si_pid));
|
||||||
|
#endif
|
||||||
|
|
||||||
|
dmStop();
|
||||||
|
}
|
||||||
|
|
||||||
static void dmSetSignalHandle() {
|
static void dmSetSignalHandle() {
|
||||||
|
taosSetSignal(SIGUSR1, dmSetDebugFlag);
|
||||||
|
taosSetSignal(SIGUSR2, dmSetAssert);
|
||||||
taosSetSignal(SIGTERM, dmStopDnode);
|
taosSetSignal(SIGTERM, dmStopDnode);
|
||||||
taosSetSignal(SIGHUP, dmStopDnode);
|
taosSetSignal(SIGHUP, dmStopDnode);
|
||||||
taosSetSignal(SIGINT, dmStopDnode);
|
taosSetSignal(SIGINT, dmStopDnode);
|
||||||
|
@ -105,6 +127,19 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void dmPrintArgs(int32_t argc, char const *argv[]) {
|
||||||
|
char path[1024] = {0};
|
||||||
|
taosGetCwd(path, sizeof(path));
|
||||||
|
|
||||||
|
char args[1024] = {0};
|
||||||
|
int32_t arglen = snprintf(args, sizeof(args), "%s", argv[0]);
|
||||||
|
for (int32_t i = 1; i < argc; ++i) {
|
||||||
|
arglen = arglen + snprintf(args + arglen, sizeof(args) - arglen, " %s", argv[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
dInfo("startup path:%s args:%s", path, args);
|
||||||
|
}
|
||||||
|
|
||||||
static void dmGenerateGrant() { mndGenerateMachineCode(); }
|
static void dmGenerateGrant() { mndGenerateMachineCode(); }
|
||||||
|
|
||||||
static void dmPrintVersion() {
|
static void dmPrintVersion() {
|
||||||
|
@ -194,6 +229,8 @@ int mainWindows(int argc, char **argv) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dmPrintArgs(argc, argv);
|
||||||
|
|
||||||
if (taosInitCfg(configDir, global.envCmd, global.envFile, global.apolloUrl, global.pArgs, 0) != 0) {
|
if (taosInitCfg(configDir, global.envCmd, global.envFile, global.apolloUrl, global.pArgs, 0) != 0) {
|
||||||
dError("failed to start since read config error");
|
dError("failed to start since read config error");
|
||||||
taosCloseLog();
|
taosCloseLog();
|
||||||
|
|
|
@ -825,7 +825,13 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
|
||||||
dbObj.cfgVersion++;
|
dbObj.cfgVersion++;
|
||||||
dbObj.updateTime = taosGetTimestampMs();
|
dbObj.updateTime = taosGetTimestampMs();
|
||||||
code = mndAlterDb(pMnode, pReq, pDb, &dbObj);
|
code = mndAlterDb(pMnode, pReq, pDb, &dbObj);
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
|
||||||
|
if (dbObj.cfg.replications != pDb->cfg.replications) {
|
||||||
|
// return quickly, operation executed asynchronously
|
||||||
|
mInfo("db:%s, alter db replica from %d to %d", pDb->name, pDb->cfg.replications, dbObj.cfg.replications);
|
||||||
|
} else {
|
||||||
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
}
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
|
|
@ -174,7 +174,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL
|
||||||
void tsdbReaderClose(STsdbReader *pReader);
|
void tsdbReaderClose(STsdbReader *pReader);
|
||||||
bool tsdbNextDataBlock(STsdbReader *pReader);
|
bool tsdbNextDataBlock(STsdbReader *pReader);
|
||||||
void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow);
|
void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow);
|
||||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockSMA, bool *allHave);
|
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock* pDataBlock, bool *allHave);
|
||||||
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
||||||
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
||||||
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||||
|
|
|
@ -1353,6 +1353,10 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (stbEntry.stbEntry.schemaTag.pSchema == NULL) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
pTagColumn = &stbEntry.stbEntry.schemaTag.pSchema[0];
|
pTagColumn = &stbEntry.stbEntry.schemaTag.pSchema[0];
|
||||||
|
|
||||||
STagVal tagVal = {.cid = pTagColumn->colId};
|
STagVal tagVal = {.cid = pTagColumn->colId};
|
||||||
|
|
|
@ -952,7 +952,10 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
||||||
SArray *pDelIdxArray = taosArrayInit(32, sizeof(SDelIdx));
|
SArray *pDelIdxArray = taosArrayInit(32, sizeof(SDelIdx));
|
||||||
|
|
||||||
code = tsdbReadDelIdx(pDelFReader, pDelIdxArray);
|
code = tsdbReadDelIdx(pDelFReader, pDelIdxArray);
|
||||||
if (code) goto _err;
|
if (code) {
|
||||||
|
tsdbDelFReaderClose(&pDelFReader);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
SDelIdx *delIdx = taosArraySearch(pDelIdxArray, &(SDelIdx){.suid = suid, .uid = uid}, tCmprDelIdx, TD_EQ);
|
SDelIdx *delIdx = taosArraySearch(pDelIdxArray, &(SDelIdx){.suid = suid, .uid = uid}, tCmprDelIdx, TD_EQ);
|
||||||
|
|
||||||
|
|
|
@ -962,6 +962,7 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pDFileSet->diskId = pSet->diskId;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4112,8 +4112,9 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg ***pBlockSMA, bool* allHave) {
|
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
SColumnDataAgg ***pBlockSMA = &pDataBlock->pBlockAgg;
|
||||||
*allHave = false;
|
*allHave = false;
|
||||||
|
|
||||||
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
||||||
|
@ -4161,6 +4162,12 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg ***pBlockS
|
||||||
int32_t i = 0, j = 0;
|
int32_t i = 0, j = 0;
|
||||||
size_t size = taosArrayGetSize(pSup->pColAgg);
|
size_t size = taosArrayGetSize(pSup->pColAgg);
|
||||||
|
|
||||||
|
// ensure capacity
|
||||||
|
if(pDataBlock->pDataBlock) {
|
||||||
|
size_t colsNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||||
|
taosArrayEnsureCap(pSup->pColAgg, colsNum);
|
||||||
|
}
|
||||||
|
|
||||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
SSDataBlock* pResBlock = pReader->pResBlock;
|
||||||
if (pResBlock->pBlockAgg == NULL) {
|
if (pResBlock->pBlockAgg == NULL) {
|
||||||
size_t num = taosArrayGetSize(pResBlock->pDataBlock);
|
size_t num = taosArrayGetSize(pResBlock->pDataBlock);
|
||||||
|
|
|
@ -224,7 +224,7 @@ static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsA
|
||||||
|
|
||||||
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
||||||
bool allColumnsHaveAgg = true;
|
bool allColumnsHaveAgg = true;
|
||||||
int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pBlock->pBlockAgg, &allColumnsHaveAgg);
|
int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
|
@ -863,19 +863,20 @@ static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) {
|
||||||
|
|
||||||
int32_t compareWinRes(void* pKey, void* data, int32_t index) {
|
int32_t compareWinRes(void* pKey, void* data, int32_t index) {
|
||||||
SArray* res = (SArray*)data;
|
SArray* res = (SArray*)data;
|
||||||
SWinKey* pos = taosArrayGet(res, index);
|
SWinKey* pDataPos = taosArrayGet(res, index);
|
||||||
SResKeyPos* pData = (SResKeyPos*)pKey;
|
SResKeyPos* pRKey = (SResKeyPos*)pKey;
|
||||||
if (*(int64_t*)pData->key == pos->ts) {
|
if (pRKey->groupId > pDataPos->groupId) {
|
||||||
if (pData->groupId > pos->groupId) {
|
|
||||||
return 1;
|
|
||||||
} else if (pData->groupId < pos->groupId) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
} else if (*(int64_t*)pData->key > pos->ts) {
|
|
||||||
return 1;
|
return 1;
|
||||||
|
} else if (pRKey->groupId < pDataPos->groupId) {
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
return -1;
|
|
||||||
|
if (*(int64_t*)pRKey->key > pDataPos->ts) {
|
||||||
|
return 1;
|
||||||
|
} else if (*(int64_t*)pRKey->key < pDataPos->ts){
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
|
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
|
||||||
|
@ -1400,19 +1401,21 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
|
||||||
|
|
||||||
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
|
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
|
||||||
SArray* res = (SArray*)data;
|
SArray* res = (SArray*)data;
|
||||||
SWinKey* pos = taosArrayGet(res, index);
|
SWinKey* pDataPos = taosArrayGet(res, index);
|
||||||
SWinKey* pData = (SWinKey*)pKey;
|
SWinKey* pWKey = (SWinKey*)pKey;
|
||||||
if (pData->ts == pos->ts) {
|
|
||||||
if (pData->groupId > pos->groupId) {
|
if (pWKey->groupId > pDataPos->groupId) {
|
||||||
return 1;
|
|
||||||
} else if (pData->groupId < pos->groupId) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
} else if (pData->ts > pos->ts) {
|
|
||||||
return 1;
|
return 1;
|
||||||
|
} else if (pWKey->groupId < pDataPos->groupId) {
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
return -1;
|
|
||||||
|
if (pWKey->ts > pDataPos->ts) {
|
||||||
|
return 1;
|
||||||
|
} else if (pWKey->ts < pDataPos->ts) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
|
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
|
||||||
|
|
|
@ -368,11 +368,13 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
|
||||||
pSlot->info.data = NULL;
|
pSlot->info.data = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray *pPageIdList = (SArray *)taosHashGet(pBucket->groupPagesMap, &groupId, sizeof(groupId));
|
SArray *pPageIdList;
|
||||||
if (pPageIdList == NULL) {
|
void *p = taosHashGet(pBucket->groupPagesMap, &groupId, sizeof(groupId));
|
||||||
SArray *pList = taosArrayInit(4, sizeof(int32_t));
|
if (p == NULL) {
|
||||||
taosHashPut(pBucket->groupPagesMap, &groupId, sizeof(groupId), &pList, POINTER_BYTES);
|
pPageIdList = taosArrayInit(4, sizeof(int32_t));
|
||||||
pPageIdList = pList;
|
taosHashPut(pBucket->groupPagesMap, &groupId, sizeof(groupId), &pPageIdList, POINTER_BYTES);
|
||||||
|
} else {
|
||||||
|
pPageIdList = *(SArray **)p;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId);
|
pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId);
|
||||||
|
|
|
@ -832,13 +832,12 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, boo
|
||||||
|
|
||||||
static int32_t getTableMetaAndVgroupImpl(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pMissCache) {
|
static int32_t getTableMetaAndVgroupImpl(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pMissCache) {
|
||||||
SVgroupInfo vg;
|
SVgroupInfo vg;
|
||||||
bool exists = true;
|
int32_t code = catalogGetCachedTableVgMeta(pCxt->pCatalog, &pStmt->targetTableName, &vg, &pStmt->pTableMeta);
|
||||||
int32_t code = catalogGetCachedTableHashVgroup(pCxt->pCatalog, &pStmt->targetTableName, &vg, &exists);
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
if (exists) {
|
if (NULL != pStmt->pTableMeta) {
|
||||||
code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
|
code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
|
||||||
}
|
}
|
||||||
*pMissCache = !exists;
|
*pMissCache = (NULL == pStmt->pTableMeta);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -857,6 +856,18 @@ static int32_t getTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModifOpStm
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t collectUseTable(const SName* pName, SHashObj* pTable) {
|
||||||
|
char fullName[TSDB_TABLE_FNAME_LEN];
|
||||||
|
tNameExtractFullName(pName, fullName);
|
||||||
|
return taosHashPut(pTable, fullName, strlen(fullName), pName, sizeof(SName));
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t collectUseDatabase(const SName* pName, SHashObj* pDbs) {
|
||||||
|
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||||
|
tNameGetFullDbName(pName, dbFName);
|
||||||
|
return taosHashPut(pDbs, dbFName, strlen(dbFName), dbFName, sizeof(dbFName));
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
|
static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
|
||||||
if (pCxt->forceUpdate) {
|
if (pCxt->forceUpdate) {
|
||||||
pCxt->missCache = true;
|
pCxt->missCache = true;
|
||||||
|
@ -864,15 +875,24 @@ static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
|
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
|
||||||
// if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
#if 0
|
||||||
// code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache);
|
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||||
// }
|
code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache);
|
||||||
// if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
}
|
||||||
// code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
|
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||||
// }
|
code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
|
||||||
|
}
|
||||||
|
#else
|
||||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||||
code = getTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache);
|
code = getTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||||
|
code = collectUseDatabase(&pStmt->targetTableName, pStmt->pDbFNameHashObj);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||||
|
code = collectUseTable(&pStmt->targetTableName, pStmt->pTableNameHashObj);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1807,16 +1827,25 @@ static int32_t initInsertQuery(SInsertParseContext* pCxt, SCatalogReq* pCatalogR
|
||||||
|
|
||||||
static int32_t setRefreshMate(SQuery* pQuery) {
|
static int32_t setRefreshMate(SQuery* pQuery) {
|
||||||
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
|
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
|
||||||
SName* pTable = taosHashIterate(pStmt->pTableNameHashObj, NULL);
|
|
||||||
while (NULL != pTable) {
|
if (taosHashGetSize(pStmt->pTableNameHashObj) > 0) {
|
||||||
taosArrayPush(pQuery->pTableList, pTable);
|
taosArrayDestroy(pQuery->pTableList);
|
||||||
pTable = taosHashIterate(pStmt->pTableNameHashObj, pTable);
|
pQuery->pTableList = taosArrayInit(taosHashGetSize(pStmt->pTableNameHashObj), sizeof(SName));
|
||||||
|
SName* pTable = taosHashIterate(pStmt->pTableNameHashObj, NULL);
|
||||||
|
while (NULL != pTable) {
|
||||||
|
taosArrayPush(pQuery->pTableList, pTable);
|
||||||
|
pTable = taosHashIterate(pStmt->pTableNameHashObj, pTable);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
char* pDb = taosHashIterate(pStmt->pDbFNameHashObj, NULL);
|
if (taosHashGetSize(pStmt->pDbFNameHashObj) > 0) {
|
||||||
while (NULL != pDb) {
|
taosArrayDestroy(pQuery->pDbList);
|
||||||
taosArrayPush(pQuery->pDbList, pDb);
|
pQuery->pDbList = taosArrayInit(taosHashGetSize(pStmt->pDbFNameHashObj), TSDB_DB_FNAME_LEN);
|
||||||
pDb = taosHashIterate(pStmt->pDbFNameHashObj, pDb);
|
char* pDb = taosHashIterate(pStmt->pDbFNameHashObj, NULL);
|
||||||
|
while (NULL != pDb) {
|
||||||
|
taosArrayPush(pQuery->pDbList, pDb);
|
||||||
|
pDb = taosHashIterate(pStmt->pDbFNameHashObj, pDb);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1930,16 +1959,17 @@ static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifOpStm
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCatalogReq* pCatalogReq) {
|
static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCatalogReq* pCatalogReq) {
|
||||||
|
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
|
||||||
if (pCxt->missCache) {
|
if (pCxt->missCache) {
|
||||||
parserDebug("0x%" PRIx64 " %d rows have been inserted before cache miss", pCxt->pComCxt->requestId,
|
parserDebug("0x%" PRIx64 " %d rows of %d tables have been inserted before cache miss", pCxt->pComCxt->requestId,
|
||||||
((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum);
|
pStmt->totalRowsNum, pStmt->totalTbNum);
|
||||||
|
|
||||||
pQuery->execStage = QUERY_EXEC_STAGE_PARSE;
|
pQuery->execStage = QUERY_EXEC_STAGE_PARSE;
|
||||||
return buildInsertCatalogReq(pCxt, (SVnodeModifOpStmt*)pQuery->pRoot, pCatalogReq);
|
return buildInsertCatalogReq(pCxt, pStmt, pCatalogReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
parserDebug("0x%" PRIx64 " %d rows have been inserted", pCxt->pComCxt->requestId,
|
parserDebug("0x%" PRIx64 " %d rows of %d tables have been inserted", pCxt->pComCxt->requestId, pStmt->totalRowsNum,
|
||||||
((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum);
|
pStmt->totalTbNum);
|
||||||
|
|
||||||
pQuery->execStage = QUERY_EXEC_STAGE_SCHEDULE;
|
pQuery->execStage = QUERY_EXEC_STAGE_SCHEDULE;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -3879,12 +3879,17 @@ static int32_t checkDbKeepOption(STranslateContext* pCxt, SDatabaseOptions* pOpt
|
||||||
pOptions->keep[2] = getBigintFromValueNode((SValueNode*)nodesListGetNode(pOptions->pKeep, 2));
|
pOptions->keep[2] = getBigintFromValueNode((SValueNode*)nodesListGetNode(pOptions->pKeep, 2));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t tsdbMaxKeep = TSDB_MAX_KEEP;
|
||||||
|
if (pOptions->precision == TSDB_TIME_PRECISION_NANO) {
|
||||||
|
tsdbMaxKeep = TSDB_MAX_KEEP_NS;
|
||||||
|
}
|
||||||
|
|
||||||
if (pOptions->keep[0] < TSDB_MIN_KEEP || pOptions->keep[1] < TSDB_MIN_KEEP || pOptions->keep[2] < TSDB_MIN_KEEP ||
|
if (pOptions->keep[0] < TSDB_MIN_KEEP || pOptions->keep[1] < TSDB_MIN_KEEP || pOptions->keep[2] < TSDB_MIN_KEEP ||
|
||||||
pOptions->keep[0] > TSDB_MAX_KEEP || pOptions->keep[1] > TSDB_MAX_KEEP || pOptions->keep[2] > TSDB_MAX_KEEP) {
|
pOptions->keep[0] > tsdbMaxKeep || pOptions->keep[1] > tsdbMaxKeep || pOptions->keep[2] > tsdbMaxKeep) {
|
||||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
|
||||||
"Invalid option keep: %" PRId64 ", %" PRId64 ", %" PRId64 " valid range: [%dm, %dm]",
|
"Invalid option keep: %" PRId64 ", %" PRId64 ", %" PRId64 " valid range: [%dm, %dm]",
|
||||||
pOptions->keep[0], pOptions->keep[1], pOptions->keep[2], TSDB_MIN_KEEP,
|
pOptions->keep[0], pOptions->keep[1], pOptions->keep[2], TSDB_MIN_KEEP,
|
||||||
TSDB_MAX_KEEP);
|
tsdbMaxKeep);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!((pOptions->keep[0] <= pOptions->keep[1]) && (pOptions->keep[1] <= pOptions->keep[2]))) {
|
if (!((pOptions->keep[0] <= pOptions->keep[1]) && (pOptions->keep[1] <= pOptions->keep[2]))) {
|
||||||
|
@ -4036,7 +4041,10 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
|
||||||
TSDB_MAX_MINROWS_FBLOCK);
|
TSDB_MAX_MINROWS_FBLOCK);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = checkDbKeepOption(pCxt, pOptions);
|
code = checkDbPrecisionOption(pCxt, pOptions);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = checkDbKeepOption(pCxt, pOptions); // use precision
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = checkDbRangeOption(pCxt, "pages", pOptions->pages, TSDB_MIN_PAGES_PER_VNODE, TSDB_MAX_PAGES_PER_VNODE);
|
code = checkDbRangeOption(pCxt, "pages", pOptions->pages, TSDB_MIN_PAGES_PER_VNODE, TSDB_MAX_PAGES_PER_VNODE);
|
||||||
|
@ -4049,9 +4057,6 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
|
||||||
code = checkDbRangeOption(pCxt, "tsdbPagesize", pOptions->tsdbPageSize, TSDB_MIN_TSDB_PAGESIZE,
|
code = checkDbRangeOption(pCxt, "tsdbPagesize", pOptions->tsdbPageSize, TSDB_MIN_TSDB_PAGESIZE,
|
||||||
TSDB_MAX_TSDB_PAGESIZE);
|
TSDB_MAX_TSDB_PAGESIZE);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = checkDbPrecisionOption(pCxt, pOptions);
|
|
||||||
}
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = checkDbEnumOption(pCxt, "replications", pOptions->replica, TSDB_MIN_DB_REPLICA, TSDB_MAX_DB_REPLICA);
|
code = checkDbEnumOption(pCxt, "replications", pOptions->replica, TSDB_MIN_DB_REPLICA, TSDB_MAX_DB_REPLICA);
|
||||||
}
|
}
|
||||||
|
|
|
@ -497,3 +497,11 @@ int32_t taosCloseDir(TdDirPtr *ppDir) {
|
||||||
return 0;
|
return 0;
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void taosGetCwd(char *buf, int32_t len) {
|
||||||
|
#if !defined(WINDOWS)
|
||||||
|
(void)getcwd(buf, len - 1);
|
||||||
|
#else
|
||||||
|
strncpy(buf, "not implemented on windows", len -1);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
|
@ -780,7 +780,7 @@ cmp_end:
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool taosAssertLog(bool condition, const char *file, int32_t line, const char *format, ...) {
|
bool taosAssert(bool condition, const char *file, int32_t line, const char *format, ...) {
|
||||||
if (condition) return false;
|
if (condition) return false;
|
||||||
|
|
||||||
const char *flags = "UTL FATAL ";
|
const char *flags = "UTL FATAL ";
|
||||||
|
|
|
@ -116,6 +116,25 @@ endi
|
||||||
|
|
||||||
print ============= step4: alter database
|
print ============= step4: alter database
|
||||||
sql alter database db replica 3
|
sql alter database db replica 3
|
||||||
|
$wt = 0
|
||||||
|
stepwt1:
|
||||||
|
$wt = $wt + 1
|
||||||
|
sleep 1000
|
||||||
|
if $wt == 200 then
|
||||||
|
print ====> dnode not ready!
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql show transactions
|
||||||
|
if $rows != 0 then
|
||||||
|
print wait 1 seconds to alter
|
||||||
|
goto stepwt1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql show db.vgroups
|
||||||
|
print ---> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||||
|
print ---> $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data27 $data28 $data29
|
||||||
|
print ---> $data10 $data11 $data12 $data13 $data14 $data15 $data26 $data37 $data38 $data39
|
||||||
|
print ---> $data10 $data11 $data12 $data13 $data14 $data15 $data36 $data47 $data48 $data49
|
||||||
|
|
||||||
$leaderIndex = 0
|
$leaderIndex = 0
|
||||||
|
|
||||||
|
|
|
@ -148,6 +148,26 @@ endi
|
||||||
|
|
||||||
print ============= step3: alter database
|
print ============= step3: alter database
|
||||||
sql alter database db replica 1
|
sql alter database db replica 1
|
||||||
|
$wt = 0
|
||||||
|
stepwt1:
|
||||||
|
$wt = $wt + 1
|
||||||
|
sleep 1000
|
||||||
|
if $wt == 200 then
|
||||||
|
print ====> dnode not ready!
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql show transactions
|
||||||
|
if $rows != 0 then
|
||||||
|
print wait 1 seconds to alter
|
||||||
|
goto stepwt1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql show db.vgroups
|
||||||
|
print ---> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||||
|
print ---> $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data27 $data28 $data29
|
||||||
|
print ---> $data10 $data11 $data12 $data13 $data14 $data15 $data26 $data37 $data38 $data39
|
||||||
|
print ---> $data10 $data11 $data12 $data13 $data14 $data15 $data36 $data47 $data48 $data49
|
||||||
|
|
||||||
$hasleader = 0
|
$hasleader = 0
|
||||||
|
|
||||||
$x = 0
|
$x = 0
|
||||||
|
|
Loading…
Reference in New Issue