Merge pull request #25483 from taosdata/fix/3_liaohj

fix(query): add ts in cache_scan_operator if pk exists and only retri…
This commit is contained in:
Haojun Liao 2024-04-25 22:32:26 +08:00 committed by GitHub
commit b3d2ccec83
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 346 additions and 291 deletions

View File

@ -15,7 +15,7 @@
#ifndef _TD_COMMON_DEF_H_
#define _TD_COMMON_DEF_H_
// #include "taosdef.h"
#include "tarray.h"
#include "tmsg.h"
#include "tvariant.h"
@ -412,6 +412,7 @@ typedef struct STUidTagInfo {
#define UD_TAG_COLUMN_INDEX 2
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime);
int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol);
#define TSMA_RES_STB_POSTFIX "_tsma_res_stb_"
#define MD5_OUTPUT_LEN 32

View File

@ -260,7 +260,7 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile
bool tsc);
void taosCleanupCfg();
int32_t taosCfgDynamicOptions(SConfig *pCfg, char *name, bool forServer);
int32_t taosCfgDynamicOptions(SConfig *pCfg, const char *name, bool forServer);
struct SConfig *taosGetCfg();

View File

@ -98,34 +98,32 @@ typedef struct {
const char *value;
} SConfigPair;
typedef struct SConfig {
ECfgSrcType stype;
SArray *array;
} SConfig;
SConfig *cfgInit();
int32_t cfgLoad(SConfig *pCfg, ECfgSrcType cfgType, const void *sourceStr);
int32_t cfgLoadFromArray(SConfig *pCfg, SArray *pArgs); // SConfigPair
void cfgCleanup(SConfig *pCfg);
typedef struct SConfig SConfig;
typedef struct SConfigIter SConfigIter;
SConfig *cfgInit();
int32_t cfgLoad(SConfig *pCfg, ECfgSrcType cfgType, const void *sourceStr);
int32_t cfgLoadFromArray(SConfig *pCfg, SArray *pArgs); // SConfigPair
void cfgCleanup(SConfig *pCfg);
int32_t cfgGetSize(SConfig *pCfg);
SConfigItem *cfgGetItem(SConfig *pCfg, const char *name);
int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype);
int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer);
SConfigIter *cfgCreateIter(SConfig *pConf);
SConfigItem *cfgNextIter(SConfigIter *pIter);
void cfgDestroyIter(SConfigIter *pIter);
int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer);
// clang-format off
int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scope, int8_t dynScope);
int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope,
int8_t dynScope);
int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, int8_t scope,
int8_t dynScope);
int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, float minval, float maxval, int8_t scope,
int8_t dynScope);
int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope, int8_t dynScope);
int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, int8_t scope, int8_t dynScope);
int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, float minval, float maxval, int8_t scope, int8_t dynScope);
int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope);
int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope);
int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope);
int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope);
int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope);
// clang-format on
const char *cfgStypeStr(ECfgSrcType type);
const char *cfgDtypeStr(ECfgDataType type);

View File

@ -770,6 +770,32 @@ int taos_init() {
return tscInitRes;
}
const char* getCfgName(TSDB_OPTION option) {
const char* name = NULL;
switch (option) {
case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
name = "shellActivityTimer";
break;
case TSDB_OPTION_LOCALE:
name = "locale";
break;
case TSDB_OPTION_CHARSET:
name = "charset";
break;
case TSDB_OPTION_TIMEZONE:
name = "timezone";
break;
case TSDB_OPTION_USE_ADAPTER:
name = "useAdapter";
break;
default:
break;
}
return name;
}
int taos_options_imp(TSDB_OPTION option, const char *str) {
if (option == TSDB_OPTION_CONFIGDIR) {
#ifndef WINDOWS
@ -799,39 +825,26 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
SConfig *pCfg = taosGetCfg();
SConfigItem *pItem = NULL;
const char *name = getCfgName(option);
switch (option) {
case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
pItem = cfgGetItem(pCfg, "shellActivityTimer");
break;
case TSDB_OPTION_LOCALE:
pItem = cfgGetItem(pCfg, "locale");
break;
case TSDB_OPTION_CHARSET:
pItem = cfgGetItem(pCfg, "charset");
break;
case TSDB_OPTION_TIMEZONE:
pItem = cfgGetItem(pCfg, "timezone");
break;
case TSDB_OPTION_USE_ADAPTER:
pItem = cfgGetItem(pCfg, "useAdapter");
break;
default:
break;
if (name == NULL) {
tscError("Invalid option %d", option);
return -1;
}
pItem = cfgGetItem(pCfg, name);
if (pItem == NULL) {
tscError("Invalid option %d", option);
return -1;
}
int code = cfgSetItem(pCfg, pItem->name, str, CFG_STYPE_TAOS_OPTIONS);
int code = cfgSetItem(pCfg, name, str, CFG_STYPE_TAOS_OPTIONS);
if (code != 0) {
tscError("failed to set cfg:%s to %s since %s", pItem->name, str, terrstr());
tscError("failed to set cfg:%s to %s since %s", name, str, terrstr());
} else {
tscInfo("set cfg:%s to %s", pItem->name, str);
tscInfo("set cfg:%s to %s", name, str);
if (TSDB_OPTION_SHELL_ACTIVITY_TIMER == option || TSDB_OPTION_USE_ADAPTER == option) {
code = taosCfgDynamicOptions(pCfg, pItem->name, false);
code = taosCfgDynamicOptions(pCfg, name, false);
}
}

View File

@ -827,10 +827,14 @@ TEST(clientCase, projection_query_tables) {
// }
// taos_free_result(pRes);
TAOS_RES* pRes = taos_query(pConn, "use test");
TAOS_RES* pRes = taos_query(pConn, "use cache_1");
taos_free_result(pRes);
pRes = taos_query(pConn, "create table st2 (ts timestamp, k int primary key, j varchar(1000)) tags(a int)");
pRes = taos_query(pConn, "select last(ts), ts from cache_1.t1");
// pRes = taos_query(pConn, "select last(ts), ts from cache_1.no_pk_t1");
if (taos_errno(pRes) != 0) {
printf("failed to exec query, %s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
// pRes = taos_query(pConn, "create stream stream_1 trigger at_once fill_history 1 ignore expired 0 into str_res1 as select _wstart as ts, count(*) from stable_1 interval(10s);");

View File

@ -1032,7 +1032,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) {
sndDebugFlag = cfgGetItem(pCfg, "sndDebugFlag")->i32;
}
static int32_t taosSetSlowLogScope(char *pScope) {
static int32_t taosSetSlowLogScope(const char *pScope) {
if (NULL == pScope || 0 == strlen(pScope)) {
tsSlowLogScope = SLOW_LOG_TYPE_ALL;
return 0;
@ -1505,7 +1505,7 @@ static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize,
return terrno == TSDB_CODE_SUCCESS ? 0 : -1;
}
static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) {
static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
terrno = TSDB_CODE_SUCCESS;
if (strcasecmp(name, "resetlog") == 0) {
@ -1583,11 +1583,12 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) {
return terrno == TSDB_CODE_SUCCESS ? 0 : -1;
}
static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
// todo fix race condition caused by update of config, pItem->str may be removed
static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
terrno = TSDB_CODE_SUCCESS;
SConfigItem *pItem = cfgGetItem(pCfg, name);
if (!pItem || (pItem->dynScope & CFG_DYN_CLIENT) == 0) {
if ((pItem == NULL) || (pItem->dynScope & CFG_DYN_CLIENT) == 0) {
uError("failed to config:%s, not support", name);
terrno = TSDB_CODE_INVALID_CFG;
return -1;
@ -1598,6 +1599,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
int32_t len = strlen(name);
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len));
switch (lowcaseName[0]) {
case 'd': {
if (strcasecmp("debugFlag", name) == 0) {
@ -1803,9 +1805,12 @@ _out:
return terrno == TSDB_CODE_SUCCESS ? 0 : -1;
}
int32_t taosCfgDynamicOptions(SConfig *pCfg, char *name, bool forServer) {
if (forServer) return taosCfgDynamicOptionsForServer(pCfg, name);
return taosCfgDynamicOptionsForClient(pCfg, name);
int32_t taosCfgDynamicOptions(SConfig *pCfg, const char *name, bool forServer) {
if (forServer) {
return taosCfgDynamicOptionsForServer(pCfg, name);
} else {
return taosCfgDynamicOptionsForClient(pCfg, name);
}
}
void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal) {

View File

@ -17,6 +17,8 @@
#include "tmisce.h"
#include "tglobal.h"
#include "tjson.h"
#include "tdatablock.h"
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) {
pEp->port = 0;
memset(pEp->fqdn, 0, TSDB_FQDN_LEN);
@ -97,10 +99,10 @@ void epsetSort(SEpSet* pDst) {
SEp* s = &pDst->eps[j + 1];
int cmp = strncmp(f->fqdn, s->fqdn, sizeof(f->fqdn));
if (cmp > 0 || (cmp == 0 && f->port > s->port)) {
SEp ep = {0};
epAssign(&ep, f);
SEp ep1 = {0};
epAssign(&ep1, f);
epAssign(f, s);
epAssign(s, &ep);
epAssign(s, &ep1);
}
}
}
@ -216,3 +218,43 @@ int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t
return TSDB_CODE_SUCCESS;
}
int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol) {
SConfig* pConf = taosGetCfg();
int32_t numOfRows = 0;
int32_t col = startCol;
SConfigItem* pItem = NULL;
blockDataEnsureCapacity(pBlock, cfgGetSize(pConf));
SConfigIter* pIter = cfgCreateIter(pConf);
while ((pItem = cfgNextIter(pIter)) != NULL) {
col = startCol;
// GRANT_CFG_SKIP;
char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, col++);
colDataSetVal(pColInfo, numOfRows, name, false);
char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t valueLen = 0;
cfgDumpItemValue(pItem, &value[VARSTR_HEADER_SIZE], TSDB_CONFIG_VALUE_LEN, &valueLen);
varDataSetLen(value, valueLen);
pColInfo = taosArrayGet(pBlock->pDataBlock, col++);
colDataSetVal(pColInfo, numOfRows, value, false);
char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0};
cfgDumpItemScope(pItem, &scope[VARSTR_HEADER_SIZE], TSDB_CONFIG_SCOPE_LEN, &valueLen);
varDataSetLen(scope, valueLen);
pColInfo = taosArrayGet(pBlock->pDataBlock, col++);
colDataSetVal(pColInfo, numOfRows, scope, false);
numOfRows++;
}
pBlock->info.rows = numOfRows;
cfgDestroyIter(pIter);
return TSDB_CODE_SUCCESS;
}

View File

@ -333,39 +333,10 @@ SSDataBlock *dmBuildVariablesBlock(void) {
}
int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
int32_t numOfCfg = taosArrayGetSize(tsCfg->array);
int32_t numOfRows = 0;
blockDataEnsureCapacity(pBlock, numOfCfg);
/*int32_t code = */dumpConfToDataBlock(pBlock, 1);
for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
SConfigItem *pItem = taosArrayGet(tsCfg->array, i);
// GRANT_CFG_SKIP;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, (const char *)&dnodeId, false);
char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, name, false);
char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t valueLen = 0;
cfgDumpItemValue(pItem, &value[VARSTR_HEADER_SIZE], TSDB_CONFIG_VALUE_LEN, &valueLen);
varDataSetLen(value, valueLen);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, value, false);
char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0};
cfgDumpItemScope(pItem, &scope[VARSTR_HEADER_SIZE], TSDB_CONFIG_SCOPE_LEN, &valueLen);
varDataSetLen(scope, valueLen);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, scope, false);
numOfRows++;
}
pBlock->info.rows = numOfRows;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, false);
return TSDB_CODE_SUCCESS;
}

View File

@ -211,7 +211,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
if (pReqTask != NULL) {
tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", idstr, vgId, req.transId);
tqDebug("s-task:%s (vgId:%d) already update in transId:%d, discard the nodeEp update msg", idstr, vgId, req.transId);
rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
@ -235,7 +235,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
} else {
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
bool updateEpSet = streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
if (!updated) {
if (updateEpSet) {
updated = updateEpSet;
}
@ -245,14 +245,15 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
}
if (updated) {
tqDebug("s-task:%s vgId:%d save task after update epset", idstr, vgId);
tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
streamMetaSaveTask(pMeta, pTask);
if (ppHTask != NULL) {
streamMetaSaveTask(pMeta, *ppHTask);
}
} else {
tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId);
}
tqDebug("s-task:%s vgId:%d start to stop task after save task", idstr, vgId);
streamTaskStop(pTask);
// keep the already updated info

View File

@ -367,11 +367,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
goto _end;
}
int32_t pkBufLen = 0;
if (pr->rowKey.numOfPKs > 0) {
pkBufLen = pr->pkColumn.bytes;
}
int32_t pkBufLen = (pr->rowKey.numOfPKs > 0)? pr->pkColumn.bytes:0;
for (int32_t j = 0; j < pr->numOfCols; ++j) {
int32_t bytes = (slotIds[j] == -1) ? 1 : pr->pSchema->columns[slotIds[j]].bytes;

View File

@ -967,46 +967,11 @@ static int32_t buildLocalVariablesResultDataBlock(SSDataBlock** pOutput) {
return TSDB_CODE_SUCCESS;
}
int32_t setLocalVariablesResultIntoDataBlock(SSDataBlock* pBlock) {
int32_t numOfCfg = taosArrayGetSize(tsCfg->array);
int32_t numOfRows = 0;
blockDataEnsureCapacity(pBlock, numOfCfg);
for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
SConfigItem* pItem = taosArrayGet(tsCfg->array, i);
// GRANT_CFG_SKIP;
char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, name, false);
char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t valueLen = 0;
cfgDumpItemValue(pItem, &value[VARSTR_HEADER_SIZE], TSDB_CONFIG_VALUE_LEN, &valueLen);
varDataSetLen(value, valueLen);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, value, false);
char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0};
cfgDumpItemScope(pItem, &scope[VARSTR_HEADER_SIZE], TSDB_CONFIG_SCOPE_LEN, &valueLen);
varDataSetLen(scope, valueLen);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, scope, false);
numOfRows++;
}
pBlock->info.rows = numOfRows;
return TSDB_CODE_SUCCESS;
}
static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) {
SSDataBlock* pBlock = NULL;
int32_t code = buildLocalVariablesResultDataBlock(&pBlock);
if (TSDB_CODE_SUCCESS == code) {
code = setLocalVariablesResultIntoDataBlock(pBlock);
code = dumpConfToDataBlock(pBlock, 0);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildRetrieveTableRsp(pBlock, SHOW_LOCAL_VARIABLES_RESULT_COLS, pRsp);

View File

@ -278,7 +278,7 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) {
// todo add a dummy funtion to avoid process check
// todo add a dummy function to avoid process check
if (pCtx[k].fpSet.process == NULL) {
continue;
}

View File

@ -221,6 +221,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableListInfo* pTableList = pInfo->pTableList;
SStoreCacheReader* pReaderFn = &pInfo->readHandle.api.cacheFn;
SSDataBlock* pBufRes = pInfo->pBufferedRes;
uint64_t suid = tableListGetSuid(pTableList);
int32_t size = tableListGetSize(pTableList);
@ -237,18 +238,18 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
if (pInfo->indexOfBufferedRes >= pInfo->pBufferedRes->info.rows) {
blockDataCleanup(pInfo->pBufferedRes);
if (pInfo->indexOfBufferedRes >= pBufRes->info.rows) {
blockDataCleanup(pBufRes);
taosArrayClear(pInfo->pUidList);
int32_t code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, pInfo->pSlotIds,
pInfo->pDstSlotIds, pInfo->pUidList);
int32_t code =
pReaderFn->retrieveRows(pInfo->pLastrowReader, pBufRes, pInfo->pSlotIds, pInfo->pDstSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
// check for tag values
int32_t resultRows = pInfo->pBufferedRes->info.rows;
int32_t resultRows = pBufRes->info.rows;
// the results may be null, if last values are all null
ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList));
@ -257,12 +258,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->pRes;
if (pInfo->indexOfBufferedRes < pInfo->pBufferedRes->info.rows) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferedRes->pDataBlock); ++i) {
if (pInfo->indexOfBufferedRes < pBufRes->info.rows) {
for (int32_t i = 0; i < taosArrayGetSize(pBufRes->pDataBlock); ++i) {
SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i);
int32_t slotId = pCol->info.slotId;
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferedRes->pDataBlock, slotId);
SColumnInfoData* pSrc = taosArrayGet(pBufRes->pDataBlock, slotId);
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId);
if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) {

View File

@ -2837,7 +2837,7 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p
memcpy(pOutput->buf, pInput->buf, pOutput->bytes);
if (pInput->pkData) {
pOutput->pkBytes = pInput->pkBytes;
memcpy(pOutput->buf+pOutput->bytes, pInput->pkData, pOutput->pkBytes);
memcpy(pOutput->buf + pOutput->bytes, pInput->pkData, pOutput->pkBytes);
pOutput->pkData = pOutput->buf + pOutput->bytes;
}
return TSDB_CODE_SUCCESS;
@ -2885,7 +2885,8 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer
} else {
pInputInfo->pkData = NULL;
}
int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
if (code != TSDB_CODE_SUCCESS) {
return code;
}

View File

@ -2343,7 +2343,7 @@ static EDealRes collectFuncs(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
}
SExprNode* pExpr = (SExprNode*)pNode;
bool bFound = false;
SNode* pn = NULL;
FOREACH(pn, pCxt->pFuncs) {

View File

@ -19,6 +19,9 @@
#include "tglobal.h"
#include "parser.h"
// primary key column always the second column if exists
#define PRIMARY_COLUMN_SLOT 1
typedef struct SLogicPlanContext {
SPlanContext* pPlanCxt;
SLogicNode* pCurrRoot;
@ -304,7 +307,7 @@ static SNode* createFirstCol(SRealTableNode* pTable, const SSchema* pSchema) {
return (SNode*)pCol;
}
static int32_t addPrimaryKeyCol(SRealTableNode* pTable, SNodeList** pCols) {
static int32_t addPrimaryTsCol(SRealTableNode* pTable, SNodeList** pCols) {
bool found = false;
SNode* pCol = NULL;
FOREACH(pCol, *pCols) {
@ -327,10 +330,10 @@ static int32_t addSystableFirstCol(SRealTableNode* pTable, SNodeList** pCols) {
return nodesListMakeStrictAppend(pCols, createFirstCol(pTable, pTable->pMeta->schema));
}
static int32_t addPkCol(SRealTableNode* pTable, SNodeList** pCols) {
static int32_t addPrimaryKeyCol(SRealTableNode* pTable, SNodeList** pCols) {
bool found = false;
SNode* pCol = NULL;
SSchema* pSchema = &pTable->pMeta->schema[1];
SSchema* pSchema = &pTable->pMeta->schema[PRIMARY_COLUMN_SLOT];
FOREACH(pCol, *pCols) {
if (pSchema->colId == ((SColumnNode*)pCol)->colId) {
found = true;
@ -348,9 +351,9 @@ static int32_t addDefaultScanCol(SRealTableNode* pTable, SNodeList** pCols) {
if (TSDB_SYSTEM_TABLE == pTable->pMeta->tableType) {
return addSystableFirstCol(pTable, pCols);
}
int32_t code = addPrimaryKeyCol(pTable, pCols);
int32_t code = addPrimaryTsCol(pTable, pCols);
if (code == TSDB_CODE_SUCCESS && hasPkInTable(pTable->pMeta)) {
code = addPkCol(pTable, pCols);
code = addPrimaryKeyCol(pTable, pCols);
}
return code;
}
@ -1802,7 +1805,7 @@ static int32_t createDeleteScanLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* p
STableMeta* pMeta = ((SRealTableNode*)pDelete->pFromTable)->pMeta;
if (TSDB_CODE_SUCCESS == code && hasPkInTable(pMeta)) {
code = addPkCol((SRealTableNode*)pDelete->pFromTable, &pScan->pScanCols);
code = addPrimaryKeyCol((SRealTableNode*)pDelete->pFromTable, &pScan->pScanCols);
}
if (TSDB_CODE_SUCCESS == code && NULL != pDelete->pTagCond) {

View File

@ -3966,21 +3966,25 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
if (NULL != cxt.pLastCols) {
cxt.doAgg = false;
cxt.funcType = FUNCTION_TYPE_CACHE_LAST;
lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, pLastRowCols, true, cxt.pkBytes);
nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt);
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, pLastRowCols, false, cxt.pkBytes);
lastRowScanOptRemoveUslessTargets(pScan->node.pTargets, cxt.pLastCols, cxt.pOtherCols, pLastRowCols);
if (pPKTsCol && pScan->node.pTargets->length == 1) {
if (pPKTsCol && ((pScan->node.pTargets->length == 1) || (pScan->node.pTargets->length == 2 && cxt.pkBytes > 0))) {
// when select last(ts),ts from ..., we add another ts to targets
sprintf(pPKTsCol->colName, "#sel_val.%p", pPKTsCol);
nodesListAppend(pScan->node.pTargets, nodesCloneNode((SNode*)pPKTsCol));
}
if (pNonPKCol && cxt.pLastCols->length == 1 &&
nodesEqualNode((SNode*)pNonPKCol, nodesListGetNode(cxt.pLastCols, 0))) {
// when select last(c1), c1 from ..., we add c1 to targets
sprintf(pNonPKCol->colName, "#sel_val.%p", pNonPKCol);
nodesListAppend(pScan->node.pTargets, nodesCloneNode((SNode*)pNonPKCol));
}
nodesClearList(cxt.pLastCols);
}
nodesClearList(cxt.pOtherCols);

View File

@ -94,6 +94,24 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
return 0;
}
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
tEndEncode(pEncoder);
return 0;
}
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
static int32_t streamAlignCheckpoint(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
int64_t old = atomic_val_compare_exchange_32(&pTask->chkInfo.downstreamAlignNum, 0, num);

View File

@ -29,19 +29,15 @@ typedef struct SLaunchHTaskInfo {
STaskId hTaskId;
} SLaunchHTaskInfo;
typedef struct STaskRecheckInfo {
SStreamTask* pTask;
SStreamTaskCheckReq req;
void* checkTimer;
} STaskRecheckInfo;
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t hStreamId,
int32_t hTaskId);
static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId, int32_t hTaskId);
static void tryLaunchHistoryTask(void* param, void* tmrId);
static void doProcessDownstreamReadyRsp(SStreamTask* pTask);
static void doExecScanhistoryInFuture(void* param, void* tmrId);
static int32_t doStartScanHistoryTask(SStreamTask* pTask);
static int32_t streamTaskStartScanHistory(SStreamTask* pTask);
int32_t streamTaskSetReady(SStreamTask* pTask) {
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
@ -83,7 +79,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
return 0;
}
static void doExecScanhistoryInFuture(void* param, void* tmrId) {
void doExecScanhistoryInFuture(void* param, void* tmrId) {
SStreamTask* pTask = param;
pTask->schedHistoryInfo.numOfTicks -= 1;
@ -139,7 +135,7 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration)
return TSDB_CODE_SUCCESS;
}
static int32_t doStartScanHistoryTask(SStreamTask* pTask) {
int32_t doStartScanHistoryTask(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range;
if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask);
@ -663,16 +659,15 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
taosMemoryFree(pInfo);
}
SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t hStreamId,
int32_t hTaskId) {
SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId, int32_t hTaskId) {
SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo));
if (pInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pInfo->id.streamId = streamId;
pInfo->id.taskId = taskId;
pInfo->id.streamId = pTaskId->streamId;
pInfo->id.taskId = pTaskId->taskId;
pInfo->hTaskId.streamId = hStreamId;
pInfo->hTaskId.taskId = hTaskId;
@ -691,7 +686,8 @@ static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", idStr, pMeta->vgId, hTaskId);
SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pMeta, pTask->id.streamId, pTask->id.taskId, hStreamId, hTaskId);
STaskId id = streamTaskGetTaskId(pTask);
SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pMeta, &id, hStreamId, hTaskId);
if (pInfo == NULL) {
stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
@ -860,24 +856,6 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
return 0;
}
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
tEndEncode(pEncoder);
return 0;
}
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
SDataRange* pRange = &pTask->dataRange;

View File

@ -36,15 +36,20 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
char buf[512] = {0};
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
if (!(*pUpdated)) {
*pUpdated = isEpsetEqual(&pTask->info.epSet, pEpSet);
}
epsetAssign(&pTask->info.epSet, pEpSet);
bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet);
epsetToStr(pEpSet, buf, tListLen(buf));
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
if (!isEqual) {
(*pUpdated) = true;
char tmp[512] = {0};
epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp));
epsetAssign(&pTask->info.epSet, pEpSet);
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp);
} else {
stDebug("s-task:0x%x (vgId:%d) not updated task epset, since epset identical, %s", pTask->id.taskId, nodeId, buf);
}
}
// check for the dispatch info and the upstream task info
@ -620,13 +625,21 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
for (int32_t i = 0; i < numOfUpstream; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
if (pInfo->nodeId == nodeId) {
if (!(*pUpdated)) {
*pUpdated = isEpsetEqual(&pInfo->epSet, pEpSet);
bool equal = isEpsetEqual(&pInfo->epSet, pEpSet);
if (!equal) {
*pUpdated = true;
char tmp[512] = {0};
epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
epsetAssign(&pInfo->epSet, pEpSet);
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId,
pInfo->taskId, nodeId, buf, tmp);
} else {
stDebug("s-task:0x%x not update upstreamInfo, since identical, task:0x%x(nodeId:%d) epset:%s", pTask->id.taskId,
pInfo->taskId, nodeId, buf);
}
epsetAssign(&pInfo->epSet, pEpSet);
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId, pInfo->taskId,
nodeId, buf);
break;
}
}
@ -653,7 +666,6 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool *pUpdated) {
char buf[512] = {0};
epsetToStr(pEpSet, buf, tListLen(buf));
*pUpdated = false;
int32_t id = pTask->id.taskId;
int8_t type = pTask->outputInfo.type;
@ -661,29 +673,43 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(pVgs);
for (int32_t i = 0; i < numOfVgroups; i++) {
for (int32_t i = 0; i < taosArrayGetSize(pVgs); i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
if (pVgInfo->vgId == nodeId) {
if (!(*pUpdated)) {
(*pUpdated) = isEpsetEqual(&pVgInfo->epSet, pEpSet);
}
bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet);
if (!isEqual) {
*pUpdated = true;
char tmp[512] = {0};
epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
epsetAssign(&pVgInfo->epSet, pEpSet);
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pVgInfo->taskId, nodeId, buf);
epsetAssign(&pVgInfo->epSet, pEpSet);
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId,
nodeId, buf, tmp);
} else {
stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
pVgInfo->taskId, nodeId, buf);
}
break;
}
}
} else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
if (pDispatcher->nodeId == nodeId) {
if (!(*pUpdated)) {
*pUpdated = isEpsetEqual(&pDispatcher->epSet, pEpSet);
}
bool equal = isEpsetEqual(&pDispatcher->epSet, pEpSet);
if (!equal) {
*pUpdated = true;
epsetAssign(&pDispatcher->epSet, pEpSet);
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pDispatcher->taskId, nodeId, buf);
char tmp[512] = {0};
epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
epsetAssign(&pDispatcher->epSet, pEpSet);
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId,
nodeId, buf, tmp);
} else {
stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id,
pDispatcher->taskId, nodeId, buf);
}
}
}
}
@ -1315,7 +1341,7 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
if (pInfo->checkRspTmr == NULL) {
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer);
} else {
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, pInfo->checkRspTmr);
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
}
taosThreadMutexUnlock(&pInfo->checkInfoLock);

View File

@ -27,12 +27,17 @@
#define CFG_NAME_PRINT_LEN 24
#define CFG_SRC_PRINT_LEN 12
struct SConfig {
ECfgSrcType stype;
SArray *array;
TdThreadMutex lock;
};
int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath);
int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *filepath);
int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *envFile);
int32_t cfgLoadFromEnvVar(SConfig *pConfig);
int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd);
int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url);
int32_t cfgSetItem(SConfig *pConfig, const char *name, const char *value, ECfgSrcType stype);
extern char **environ;
@ -50,6 +55,7 @@ SConfig *cfgInit() {
return NULL;
}
taosThreadMutexInit(&pCfg->lock, NULL);
return pCfg;
}
@ -87,9 +93,9 @@ static void cfgFreeItem(SConfigItem *pItem) {
pItem->dtype == CFG_DTYPE_CHARSET || pItem->dtype == CFG_DTYPE_TIMEZONE) {
taosMemoryFreeClear(pItem->str);
}
if (pItem->array) {
taosArrayDestroy(pItem->array);
pItem->array = NULL;
pItem->array = taosArrayDestroy(pItem->array);
}
}
@ -102,37 +108,18 @@ void cfgCleanup(SConfig *pCfg) {
taosMemoryFreeClear(pItem->name);
}
taosArrayDestroy(pCfg->array);
taosThreadMutexDestroy(&pCfg->lock);
taosMemoryFree(pCfg);
}
}
int32_t cfgGetSize(SConfig *pCfg) { return taosArrayGetSize(pCfg->array); }
static int32_t cfgCheckAndSetTimezone(SConfigItem *pItem, const char *timezone) {
static int32_t cfgCheckAndSetConf(SConfigItem *pItem, const char *conf) {
cfgFreeItem(pItem);
pItem->str = taosStrdup(timezone);
if (pItem->str == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
ASSERT(pItem->str == NULL);
return 0;
}
static int32_t cfgCheckAndSetCharset(SConfigItem *pItem, const char *charset) {
cfgFreeItem(pItem);
pItem->str = taosStrdup(charset);
if (pItem->str == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static int32_t cfgCheckAndSetLocale(SConfigItem *pItem, const char *locale) {
cfgFreeItem(pItem);
pItem->str = taosStrdup(locale);
pItem->str = taosStrdup(conf);
if (pItem->str == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
@ -229,7 +216,7 @@ static int32_t cfgSetString(SConfigItem *pItem, const char *value, ECfgSrcType s
return -1;
}
taosMemoryFree(pItem->str);
taosMemoryFreeClear(pItem->str);
pItem->str = tmp;
pItem->stype = stype;
return 0;
@ -246,20 +233,8 @@ static int32_t cfgSetDir(SConfigItem *pItem, const char *value, ECfgSrcType styp
return 0;
}
static int32_t cfgSetLocale(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
if (cfgCheckAndSetLocale(pItem, value) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s", pItem->name, cfgDtypeStr(pItem->dtype),
cfgStypeStr(stype), value, terrstr());
return -1;
}
pItem->stype = stype;
return 0;
}
static int32_t cfgSetCharset(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
if (cfgCheckAndSetCharset(pItem, value) != 0) {
static int32_t doSetConf(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
if (cfgCheckAndSetConf(pItem, value) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s", pItem->name, cfgDtypeStr(pItem->dtype),
cfgStypeStr(stype), value, terrstr());
@ -271,18 +246,13 @@ static int32_t cfgSetCharset(SConfigItem *pItem, const char *value, ECfgSrcType
}
static int32_t cfgSetTimezone(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
if (cfgCheckAndSetTimezone(pItem, value) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s", pItem->name, cfgDtypeStr(pItem->dtype),
cfgStypeStr(stype), value, terrstr());
return -1;
int32_t code = doSetConf(pItem, value, stype);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pItem->stype = stype;
// apply new timezone
osSetTimezone(value);
return 0;
return code;
}
static int32_t cfgSetTfsItem(SConfig *pCfg, const char *name, const char *value, const char *level, const char *primary,
@ -342,40 +312,61 @@ static int32_t cfgUpdateDebugFlagItem(SConfig *pCfg, const char *name, bool rese
int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype) {
// GRANT_CFG_SET;
int32_t code = 0;
SConfigItem *pItem = cfgGetItem(pCfg, name);
if (pItem == NULL) {
terrno = TSDB_CODE_CFG_NOT_FOUND;
return -1;
}
taosThreadMutexLock(&pCfg->lock);
switch (pItem->dtype) {
case CFG_DTYPE_BOOL:
return cfgSetBool(pItem, value, stype);
case CFG_DTYPE_INT32:
return cfgSetInt32(pItem, value, stype);
case CFG_DTYPE_INT64:
return cfgSetInt64(pItem, value, stype);
case CFG_DTYPE_BOOL: {
code = cfgSetBool(pItem, value, stype);
break;
}
case CFG_DTYPE_INT32: {
code = cfgSetInt32(pItem, value, stype);
break;
}
case CFG_DTYPE_INT64: {
code = cfgSetInt64(pItem, value, stype);
break;
}
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE:
return cfgSetFloat(pItem, value, stype);
case CFG_DTYPE_STRING:
return cfgSetString(pItem, value, stype);
case CFG_DTYPE_DIR:
return cfgSetDir(pItem, value, stype);
case CFG_DTYPE_TIMEZONE:
return cfgSetTimezone(pItem, value, stype);
case CFG_DTYPE_CHARSET:
return cfgSetCharset(pItem, value, stype);
case CFG_DTYPE_LOCALE:
return cfgSetLocale(pItem, value, stype);
case CFG_DTYPE_DOUBLE: {
code = cfgSetFloat(pItem, value, stype);
break;
}
case CFG_DTYPE_STRING: {
code = cfgSetString(pItem, value, stype);
break;
}
case CFG_DTYPE_DIR: {
code = cfgSetDir(pItem, value, stype);
break;
}
case CFG_DTYPE_TIMEZONE: {
code = cfgSetTimezone(pItem, value, stype);
break;
}
case CFG_DTYPE_CHARSET: {
code = doSetConf(pItem, value, stype);
break;
}
case CFG_DTYPE_LOCALE: {
code = doSetConf(pItem, value, stype);
break;
}
case CFG_DTYPE_NONE:
default:
terrno = TSDB_CODE_INVALID_CFG;
break;
}
_err_out:
terrno = TSDB_CODE_INVALID_CFG;
return -1;
taosThreadMutexUnlock(&pCfg->lock);
return code;
}
SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) {
@ -388,16 +379,16 @@ SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) {
}
}
// uError("name:%s, cfg not found", name);
terrno = TSDB_CODE_CFG_NOT_FOUND;
return NULL;
}
int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer) {
ECfgDynType dynType = isServer ? CFG_DYN_SERVER : CFG_DYN_CLIENT;
SConfigItem *pItem = cfgGetItem(pCfg, name);
if (!pItem || (pItem->dynScope & dynType) == 0) {
uError("failed to config:%s, not support", name);
uError("failed to config:%s, not support update this config", name);
terrno = TSDB_CODE_INVALID_CFG;
return -1;
}
@ -459,7 +450,7 @@ static int32_t cfgAddItem(SConfig *pCfg, SConfigItem *pItem, const char *name) {
return -1;
}
int size = pCfg->array->size;
int32_t size = taosArrayGetSize(pCfg->array);
for (int32_t i = 0; i < size; ++i) {
SConfigItem *existItem = taosArrayGet(pCfg->array, i);
if (existItem != NULL && strcmp(existItem->name, pItem->name) == 0) {
@ -559,7 +550,7 @@ int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, int8_
int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope) {
SConfigItem item = {.dtype = CFG_DTYPE_LOCALE, .scope = scope, .dynScope = dynScope};
if (cfgCheckAndSetLocale(&item, defaultVal) != 0) {
if (cfgCheckAndSetConf(&item, defaultVal) != 0) {
return -1;
}
@ -568,7 +559,7 @@ int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, in
int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope) {
SConfigItem item = {.dtype = CFG_DTYPE_CHARSET, .scope = scope, .dynScope = dynScope};
if (cfgCheckAndSetCharset(&item, defaultVal) != 0) {
if (cfgCheckAndSetConf(&item, defaultVal) != 0) {
return -1;
}
@ -577,7 +568,7 @@ int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, i
int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope) {
SConfigItem item = {.dtype = CFG_DTYPE_TIMEZONE, .scope = scope, .dynScope = dynScope};
if (cfgCheckAndSetTimezone(&item, defaultVal) != 0) {
if (cfgCheckAndSetConf(&item, defaultVal) != 0) {
return -1;
}
@ -1356,3 +1347,35 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char *apolloUrl
uInfo("fail get apollo url from cmd env file");
return -1;
}
struct SConfigIter {
int32_t index;
SConfig *pConf;
};
SConfigIter *cfgCreateIter(SConfig *pConf) {
SConfigIter* pIter = taosMemoryCalloc(1, sizeof(SConfigIter));
if (pIter == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pIter->pConf = pConf;
return pIter;
}
SConfigItem *cfgNextIter(SConfigIter* pIter) {
if (pIter->index < cfgGetSize(pIter->pConf)) {
return taosArrayGet(pIter->pConf->array, pIter->index++);
}
return NULL;
}
void cfgDestroyIter(SConfigIter *pIter) {
if (pIter == NULL) {
return;
}
taosMemoryFree(pIter);
}

View File

@ -63,9 +63,11 @@ TEST_F(CfgTest, 02_Basic) {
EXPECT_EQ(cfgGetSize(pConfig), 6);
int32_t size = taosArrayGetSize(pConfig->array);
for (int32_t i = 0; i < size; ++i) {
SConfigItem *pItem = (SConfigItem *)taosArrayGet(pConfig->array, i);
int32_t size = cfgGetSize(pConfig);
SConfigItem* pItem = NULL;
SConfigIter* pIter = cfgCreateIter(pConfig);
while((pItem == cfgNextIter(pIter)) != NULL) {
switch (pItem->dtype) {
case CFG_DTYPE_BOOL:
printf("index:%d, cfg:%s value:%d\n", size, pItem->name, pItem->bval);
@ -90,9 +92,12 @@ TEST_F(CfgTest, 02_Basic) {
break;
}
}
cfgDestroyIter(pIter);
EXPECT_EQ(cfgGetSize(pConfig), 6);
SConfigItem *pItem = cfgGetItem(pConfig, "test_bool");
pItem = cfgGetItem(pConfig, "test_bool");
EXPECT_EQ(pItem->stype, CFG_STYPE_DEFAULT);
EXPECT_EQ(pItem->dtype, CFG_DTYPE_BOOL);
EXPECT_STREQ(pItem->name, "test_bool");

View File

@ -49,7 +49,7 @@ class TDTestCase(TBase):
def checkQueryOK(self, rets):
if rets[-2][:9] != "Query OK,":
tdLog.exit(f"check taos -s return unecpect: {rets}")
tdLog.exit(f"check taos -s return unexpect: {rets}")
def doTaos(self):
tdLog.info(f"check taos command options...")