Merge branch '3.0' into fix/3_liaohj
This commit is contained in:
commit
f0dda78c61
|
@ -126,6 +126,7 @@ extern "C" {
|
|||
|
||||
extern int32_t tsRandErrChance;
|
||||
extern int64_t tsRandErrDivisor;
|
||||
extern int64_t tsRandErrScope;
|
||||
extern threadlocal bool tsEnableRandErr;
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -568,6 +568,13 @@ enum {
|
|||
SND_WORKER_TYPE__UNIQUE,
|
||||
};
|
||||
|
||||
enum {
|
||||
RAND_ERR_MEMORY = 1,
|
||||
RAND_ERR_FILE = 2,
|
||||
// RAND_ERR_SCOPE_XXX... = 4,
|
||||
// ...
|
||||
};
|
||||
|
||||
#define DEFAULT_HANDLE 0
|
||||
#define MNODE_HANDLE 1
|
||||
#define QNODE_HANDLE -1
|
||||
|
|
|
@ -209,11 +209,11 @@ function clean_service_on_launchctl() {
|
|||
}
|
||||
|
||||
function remove_data_and_config() {
|
||||
data_dir=`grep dataDir /etc/taos/taos.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}`
|
||||
data_dir=`grep dataDir /etc/${PREFIX}/${PREFIX}.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}`
|
||||
if [ X"$data_dir" == X"" ]; then
|
||||
data_dir="/var/lib/${PREFIX}"
|
||||
fi
|
||||
log_dir=`grep logDir /etc/taos/taos.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}`
|
||||
log_dir=`grep logDir /etc/${PREFIX}/${PREFIX}.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}`
|
||||
if [ X"$log_dir" == X"" ]; then
|
||||
log_dir="/var/log/${PREFIX}"
|
||||
fi
|
||||
|
|
|
@ -596,6 +596,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
CFG_SCOPE_CLIENT, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(
|
||||
cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "randErrorChance", tsRandErrChance, 0, 10000, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
|
||||
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
|
||||
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorScope", tsRandErrScope, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
|
||||
|
||||
tsNumOfRpcThreads = tsNumOfCores / 2;
|
||||
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
|
||||
|
@ -778,8 +781,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "randErrorChance", tsRandErrChance, 0, 10000, CFG_SCOPE_BOTH, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_SERVER));
|
||||
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushIntervalSec, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER));
|
||||
|
@ -1214,6 +1215,15 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
|||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "metaCacheMaxSize");
|
||||
tsMetaCacheMaxSize = pItem->i32;
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "randErrorChance");
|
||||
tsRandErrChance = pItem->i32;
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "randErrorDivisor");
|
||||
tsRandErrDivisor = pItem->i64;
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "randErrorScope");
|
||||
tsRandErrScope = pItem->i64;
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "countAlwaysReturnValue");
|
||||
tsCountAlwaysReturnValue = pItem->i32;
|
||||
|
||||
|
@ -1470,12 +1480,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "mqRebalanceInterval");
|
||||
tsMqRebalanceInterval = pItem->i32;
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "randErrorChance");
|
||||
tsRandErrChance = pItem->i32;
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "randErrorDivisor");
|
||||
tsRandErrDivisor = pItem->i64;
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "ttlUnit");
|
||||
tsTtlUnit = pItem->i32;
|
||||
|
||||
|
@ -1930,7 +1934,9 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
|
|||
|
||||
{"mndSdbWriteDelta", &tsMndSdbWriteDelta},
|
||||
{"minDiskFreeSize", &tsMinDiskFreeSize},
|
||||
{"randErrorChance", &tsRandErrChance},
|
||||
{"randErrorDivisor", &tsRandErrDivisor},
|
||||
{"randErrorScope", &tsRandErrScope},
|
||||
|
||||
{"cacheLazyLoadThreshold", &tsCacheLazyLoadThreshold},
|
||||
{"checkpointInterval", &tsStreamCheckpointInterval},
|
||||
|
@ -2209,6 +2215,9 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
|
|||
{"queryPlannerTrace", &tsQueryPlannerTrace},
|
||||
{"queryNodeChunkSize", &tsQueryNodeChunkSize},
|
||||
{"queryUseNodeAllocator", &tsQueryUseNodeAllocator},
|
||||
{"randErrorChance", &tsRandErrChance},
|
||||
{"randErrorDivisor", &tsRandErrDivisor},
|
||||
{"randErrorScope", &tsRandErrScope},
|
||||
{"smlDot2Underline", &tsSmlDot2Underline},
|
||||
{"shellActivityTimer", &tsShellActivityTimer},
|
||||
{"useAdapter", &tsUseAdapter},
|
||||
|
|
|
@ -125,7 +125,7 @@ void dmLogCrash(int signum, void *sigInfo, void *context) {
|
|||
|
||||
_return:
|
||||
|
||||
taosLogCrashInfo("taosd", pMsg, msgLen, signum, sigInfo);
|
||||
taosLogCrashInfo(CUS_PROMPT "d", pMsg, msgLen, signum, sigInfo);
|
||||
|
||||
#ifdef _TD_DARWIN_64
|
||||
exit(signum);
|
||||
|
@ -258,7 +258,7 @@ static void dmPrintArgs(int32_t argc, char const *argv[]) {
|
|||
static void dmGenerateGrant() { mndGenerateMachineCode(); }
|
||||
|
||||
static void dmPrintVersion() {
|
||||
printf("%s\ntaosd version: %s compatible_version: %s\n", TD_PRODUCT_NAME, version, compatible_version);
|
||||
printf("%s\n%sd version: %s compatible_version: %s\n", TD_PRODUCT_NAME, CUS_PROMPT, version, compatible_version);
|
||||
printf("git: %s\n", gitinfo);
|
||||
#ifdef TD_ENTERPRISE
|
||||
printf("gitOfInternal: %s\n", gitinfoOfInternal);
|
||||
|
@ -268,7 +268,7 @@ static void dmPrintVersion() {
|
|||
|
||||
static void dmPrintHelp() {
|
||||
char indent[] = " ";
|
||||
printf("Usage: taosd [OPTION...] \n\n");
|
||||
printf("Usage: %sd [OPTION...] \n\n", CUS_PROMPT);
|
||||
printf("%s%s%s%s\n", indent, "-a,", indent, DM_APOLLO_URL);
|
||||
printf("%s%s%s%s\n", indent, "-c,", indent, DM_CFG_DIR);
|
||||
printf("%s%s%s%s\n", indent, "-s,", indent, DM_SDB_INFO);
|
||||
|
|
|
@ -1211,22 +1211,22 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p
|
|||
if (pTrans == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
mInfo("trans:%d, used to alter db:%s", pTrans->id, pOld->name);
|
||||
|
||||
mndTransSetDbName(pTrans, pOld->name, NULL);
|
||||
TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans));
|
||||
TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
|
||||
|
||||
TAOS_CHECK_RETURN(mndSetAlterDbPrepareLogs(pMnode, pTrans, pOld, pNew));
|
||||
TAOS_CHECK_RETURN(mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew));
|
||||
TAOS_CHECK_RETURN(mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew));
|
||||
TAOS_CHECK_RETURN(mndTransPrepare(pMnode, pTrans));
|
||||
TAOS_CHECK_GOTO(mndSetAlterDbPrepareLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
|
||||
|
|
|
@ -1406,7 +1406,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
|
|||
|
||||
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
|
||||
SCacheRowsReader *pr, int8_t ltype) {
|
||||
int32_t code = 0;
|
||||
int32_t code = 0, lino = 0;
|
||||
rocksdb_writebatch_t *wb = NULL;
|
||||
SArray *pTmpColArray = NULL;
|
||||
|
||||
|
@ -1440,11 +1440,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
|||
int lastrowIndex = 0;
|
||||
|
||||
if (!slotIds || !lastColIds || !lastSlotIds || !lastrowColIds || !lastrowSlotIds) {
|
||||
taosMemoryFree(slotIds);
|
||||
taosMemoryFree(lastColIds);
|
||||
taosMemoryFree(lastSlotIds);
|
||||
taosMemoryFree(lastrowColIds);
|
||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
|
@ -1454,9 +1450,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
|||
if (NULL == lastTmpIndexArray) {
|
||||
lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
|
||||
if (!lastTmpIndexArray) {
|
||||
taosArrayDestroy(lastrowTmpIndexArray);
|
||||
|
||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
(void)taosArrayPush(lastTmpIndexArray, &(i));
|
||||
|
@ -1467,9 +1461,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
|||
if (NULL == lastrowTmpIndexArray) {
|
||||
lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
|
||||
if (!lastrowTmpIndexArray) {
|
||||
taosArrayDestroy(lastTmpIndexArray);
|
||||
|
||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
(void)taosArrayPush(lastrowTmpIndexArray, &(i));
|
||||
|
@ -1481,13 +1473,11 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
|||
|
||||
pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
|
||||
if (!pTmpColArray) {
|
||||
taosArrayDestroy(lastrowTmpIndexArray);
|
||||
taosArrayDestroy(lastTmpIndexArray);
|
||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
if (lastTmpIndexArray != NULL) {
|
||||
(void)mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds);
|
||||
TAOS_CHECK_EXIT(mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds));
|
||||
for (int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
|
||||
(void)taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastTmpIndexArray, i),
|
||||
taosArrayGet(lastTmpColArray, i));
|
||||
|
@ -1495,7 +1485,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
|||
}
|
||||
|
||||
if (lastrowTmpIndexArray != NULL) {
|
||||
(void)mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds);
|
||||
TAOS_CHECK_EXIT(mergeLastRowCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds));
|
||||
for (int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
|
||||
(void)taosArrayInsert(pTmpColArray, *(int32_t *)taosArrayGet(lastrowTmpIndexArray, i),
|
||||
taosArrayGet(lastrowTmpColArray, i));
|
||||
|
@ -1517,7 +1507,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
|||
.cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||
if (!pLastCol) {
|
||||
pLastCol = &noneCol;
|
||||
TAOS_CHECK_RETURN(reallocVarData(&pLastCol->colVal));
|
||||
TAOS_CHECK_EXIT(reallocVarData(&pLastCol->colVal));
|
||||
}
|
||||
|
||||
taosArraySet(pLastArray, idxKey->idx, pLastCol);
|
||||
|
@ -1532,12 +1522,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
|||
|
||||
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||
if (!pTmpLastCol) {
|
||||
taosMemoryFree(slotIds);
|
||||
taosMemoryFree(lastColIds);
|
||||
taosMemoryFree(lastSlotIds);
|
||||
taosMemoryFree(lastrowColIds);
|
||||
taosMemoryFree(lastrowSlotIds);
|
||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
*pTmpLastCol = *pLastCol;
|
||||
pLastCol = pTmpLastCol;
|
||||
|
@ -1546,12 +1531,12 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
|||
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
|
||||
SValue *pValue = &pLastCol->rowKey.pks[i];
|
||||
if (IS_VAR_DATA_TYPE(pValue->type)) {
|
||||
TAOS_CHECK_RETURN(reallocVarDataVal(pValue));
|
||||
TAOS_CHECK_EXIT(reallocVarDataVal(pValue));
|
||||
charge += pValue->nData;
|
||||
}
|
||||
}
|
||||
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
|
||||
TAOS_CHECK_RETURN(reallocVarData(&pLastCol->colVal));
|
||||
TAOS_CHECK_EXIT(reallocVarData(&pLastCol->colVal));
|
||||
charge += pLastCol->colVal.value.nData;
|
||||
}
|
||||
|
||||
|
@ -1580,6 +1565,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
|||
rocksMayWrite(pTsdb, false, true);
|
||||
}
|
||||
|
||||
_exit:
|
||||
taosArrayDestroy(lastrowTmpIndexArray);
|
||||
taosArrayDestroy(lastrowTmpColArray);
|
||||
taosArrayDestroy(lastTmpIndexArray);
|
||||
|
@ -2951,11 +2937,13 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI
|
|||
|
||||
if (!pIter->pSkyline) {
|
||||
pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
|
||||
TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);
|
||||
|
||||
uint64_t uid = pIter->idx.uid;
|
||||
STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
|
||||
if (pInfo->pTombData == NULL) {
|
||||
pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
|
||||
TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
(void)taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData);
|
||||
|
@ -2963,6 +2951,7 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI
|
|||
size_t delSize = TARRAY_SIZE(pInfo->pTombData);
|
||||
if (delSize > 0) {
|
||||
code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline);
|
||||
TAOS_CHECK_GOTO(code, &lino, _err);
|
||||
}
|
||||
pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1;
|
||||
}
|
||||
|
@ -3065,11 +3054,13 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
|||
|
||||
// inverse iterator
|
||||
CacheNextRowIter iter = {0};
|
||||
(void)nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
|
||||
code =
|
||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
|
||||
TAOS_CHECK_GOTO(code, &lino, _err);
|
||||
|
||||
do {
|
||||
TSDBROW *pRow = NULL;
|
||||
(void)nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
|
||||
code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
|
||||
|
||||
if (!pRow) {
|
||||
break;
|
||||
|
@ -3250,11 +3241,13 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
|
|||
|
||||
// inverse iterator
|
||||
CacheNextRowIter iter = {0};
|
||||
(void)nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
|
||||
code =
|
||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
|
||||
TAOS_CHECK_GOTO(code, &lino, _err);
|
||||
|
||||
do {
|
||||
TSDBROW *pRow = NULL;
|
||||
(void)nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
|
||||
code = nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
|
||||
|
||||
if (!pRow) {
|
||||
break;
|
||||
|
|
|
@ -157,41 +157,35 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) {
|
|||
int64_t numRecord = 0;
|
||||
SMetaInfo info;
|
||||
|
||||
if (committer->tsdb->imem->nDel == 0) {
|
||||
goto _exit;
|
||||
}
|
||||
// if no history data and no new timestamp data, skip tomb data
|
||||
if (committer->ctx->info->fset || committer->ctx->hasTSData) {
|
||||
committer->ctx->tbid->suid = 0;
|
||||
committer->ctx->tbid->uid = 0;
|
||||
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) {
|
||||
if (record->uid != committer->ctx->tbid->uid) {
|
||||
committer->ctx->tbid->suid = record->suid;
|
||||
committer->ctx->tbid->uid = record->uid;
|
||||
|
||||
// do not need to write tomb data if there is no ts data
|
||||
bool skip = (committer->ctx->info->fset == NULL && !committer->ctx->hasTSData);
|
||||
|
||||
committer->ctx->tbid->suid = 0;
|
||||
committer->ctx->tbid->uid = 0;
|
||||
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) {
|
||||
if (record->uid != committer->ctx->tbid->uid) {
|
||||
committer->ctx->tbid->suid = record->suid;
|
||||
committer->ctx->tbid->uid = record->uid;
|
||||
|
||||
if (metaGetInfo(committer->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) {
|
||||
TAOS_CHECK_GOTO(tsdbIterMergerSkipTableData(committer->tombIterMerger, committer->ctx->tbid), &lino, _exit);
|
||||
continue;
|
||||
if (metaGetInfo(committer->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) {
|
||||
TAOS_CHECK_GOTO(tsdbIterMergerSkipTableData(committer->tombIterMerger, committer->ctx->tbid), &lino, _exit);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (record->ekey < committer->ctx->minKey) {
|
||||
// do nothing
|
||||
} else if (record->skey > committer->ctx->maxKey) {
|
||||
// committer->ctx->nextKey = TMIN(record->skey, committer->ctx->nextKey);
|
||||
} else {
|
||||
record->skey = TMAX(record->skey, committer->ctx->minKey);
|
||||
record->ekey = TMIN(record->ekey, committer->ctx->maxKey);
|
||||
if (record->ekey < committer->ctx->minKey) {
|
||||
// do nothing
|
||||
} else if (record->skey > committer->ctx->maxKey) {
|
||||
// committer->ctx->nextKey = TMIN(record->skey, committer->ctx->nextKey);
|
||||
} else {
|
||||
record->skey = TMAX(record->skey, committer->ctx->minKey);
|
||||
record->ekey = TMIN(record->ekey, committer->ctx->maxKey);
|
||||
|
||||
if (!skip) {
|
||||
numRecord++;
|
||||
TAOS_CHECK_GOTO(tsdbFSetWriteTombRecord(committer->writer, record), &lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
TAOS_CHECK_GOTO(tsdbIterMergerNext(committer->tombIterMerger), &lino, _exit);
|
||||
TAOS_CHECK_GOTO(tsdbIterMergerNext(committer->tombIterMerger), &lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
|
|
|
@ -463,7 +463,7 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe
|
|||
|
||||
if (cid < blockCol.cid) {
|
||||
const STColumn *tcol = tTSchemaSearchColumn(pTSchema, cid);
|
||||
ASSERT(tcol);
|
||||
TSDB_CHECK_NULL(tcol, code, lino, _exit, TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER);
|
||||
SBlockCol none = {
|
||||
.cid = cid,
|
||||
.type = tcol->type,
|
||||
|
|
|
@ -1017,6 +1017,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
|
|||
pSttDataInfo->numOfRows += numOfRows;
|
||||
}
|
||||
} else {
|
||||
TAOS_CHECK_GOTO(terrno, NULL, _end);
|
||||
if (!pMTree->ignoreEarlierTs) {
|
||||
pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
|
||||
}
|
||||
|
|
|
@ -2440,20 +2440,23 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
|||
|
||||
int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(info.pKeyRangeList);
|
||||
pReader->code = code;
|
||||
return false;
|
||||
}
|
||||
|
||||
code = initMemDataIterator(pScanInfo, pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(info.pKeyRangeList);
|
||||
pReader->code = code;
|
||||
return false;
|
||||
}
|
||||
|
||||
code = initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(info.pKeyRangeList);
|
||||
pReader->code = code;
|
||||
return code;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (conf.rspRows) {
|
||||
|
|
|
@ -309,7 +309,7 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *
|
|||
|
||||
if (cid < blockCol.cid) {
|
||||
const STColumn *tcol = tTSchemaSearchColumn(pTSchema, cid);
|
||||
ASSERT(tcol);
|
||||
TSDB_CHECK_NULL(tcol, code, lino, _exit, TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER);
|
||||
SBlockCol none = {
|
||||
.cid = cid,
|
||||
.type = tcol->type,
|
||||
|
|
|
@ -477,10 +477,12 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
|
|||
blockDataDestroy(pInfo->pDelRes);
|
||||
blockDataDestroy(pInfo->pMidRetriveRes);
|
||||
blockDataDestroy(pInfo->pMidPulloverRes);
|
||||
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
|
||||
if (pInfo->stateStore.streamFileStateDestroy != NULL) {
|
||||
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
|
||||
}
|
||||
taosArrayDestroy(pInfo->pMidPullDatas);
|
||||
|
||||
if (pInfo->pState->dump == 1) {
|
||||
if (pInfo->pState !=NULL && pInfo->pState->dump == 1) {
|
||||
taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner);
|
||||
taosMemoryFreeClear(pInfo->pState->pTdbState);
|
||||
}
|
||||
|
@ -1953,12 +1955,14 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
pInfo->numOfDatapack = 0;
|
||||
pInfo->pUpdated = NULL;
|
||||
pInfo->pUpdatedMap = NULL;
|
||||
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
|
||||
int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols);
|
||||
pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(
|
||||
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
|
||||
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH);
|
||||
QUERY_CHECK_NULL(pInfo->pState->pFileState, code, lino, _error, terrno);
|
||||
|
||||
pInfo->dataVersion = 0;
|
||||
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
|
||||
pInfo->recvGetAll = false;
|
||||
pInfo->recvPullover = false;
|
||||
pInfo->recvRetrive = false;
|
||||
|
@ -2032,7 +2036,9 @@ void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
|
|||
tSimpleHashCleanup(pSup->pResultRows);
|
||||
destroyDiskbasedBuf(pSup->pResultBuf);
|
||||
blockDataDestroy(pSup->pScanBlock);
|
||||
pSup->stateStore.streamFileStateDestroy(pSup->pState->pFileState);
|
||||
if (pSup->stateStore.streamFileStateDestroy != NULL) {
|
||||
pSup->stateStore.streamFileStateDestroy(pSup->pState->pFileState);
|
||||
}
|
||||
taosMemoryFreeClear(pSup->pState);
|
||||
taosMemoryFreeClear(pSup->pDummyCtx);
|
||||
}
|
||||
|
@ -2141,7 +2147,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
|||
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
|
||||
SStorageAPI* pApi, int32_t tsIndex) {
|
||||
pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput);
|
||||
|
||||
int32_t lino = 0;
|
||||
int32_t code = createSpecialDataBlock(STREAM_CLEAR, &pSup->pScanBlock);
|
||||
if (code) {
|
||||
return code;
|
||||
|
@ -2156,6 +2162,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
|||
}
|
||||
|
||||
pSup->stateStore = *pStore;
|
||||
pSup->pSessionAPI = pApi;
|
||||
|
||||
initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput);
|
||||
pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||
|
@ -2168,6 +2175,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
|||
pSup->pState->pFileState = pSup->stateStore.streamFileStateInit(
|
||||
tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState,
|
||||
pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT);
|
||||
QUERY_CHECK_NULL(pSup->pState->pFileState, code, lino, _end, terrno);
|
||||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pSup->pResultRows = tSimpleHashInit(32, hashFn);
|
||||
|
@ -2179,8 +2187,11 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
|||
pExpSup->pCtx[i].saveHandle.pState = pSup->pState;
|
||||
}
|
||||
|
||||
pSup->pSessionAPI = pApi;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) {
|
||||
|
@ -5308,9 +5319,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
pInfo->pUpdatedMap = NULL;
|
||||
int32_t funResSize = getMaxFunResSize(pSup, numOfCols);
|
||||
|
||||
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
|
||||
pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
|
||||
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
|
||||
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH);
|
||||
QUERY_CHECK_NULL(pInfo->pState->pFileState, code, lino, _error, terrno);
|
||||
|
||||
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
|
@ -5319,7 +5332,6 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
|
||||
|
||||
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
|
||||
pInfo->recvGetAll = false;
|
||||
|
||||
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
|
||||
|
|
|
@ -2573,13 +2573,14 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
|
|||
|
||||
for (int32_t i = 0; i < 2; ++i) {
|
||||
uint8_t paraType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, i))->type;
|
||||
if (!IS_STR_DATA_TYPE(paraType) && !IS_INTEGER_TYPE(paraType) && !IS_TIMESTAMP_TYPE(paraType)) {
|
||||
if (!IS_STR_DATA_TYPE(paraType) && !IS_INTEGER_TYPE(paraType) && !IS_TIMESTAMP_TYPE(paraType) && !IS_NULL_TYPE(paraType)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
}
|
||||
|
||||
uint8_t para2Type;
|
||||
if (3 == numOfParams) {
|
||||
if (!IS_INTEGER_TYPE(getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 2))->type)) {
|
||||
para2Type = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 2))->type;
|
||||
if (!IS_INTEGER_TYPE(para2Type) && !IS_NULL_TYPE(para2Type)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
}
|
||||
|
@ -2587,7 +2588,7 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
|
|||
// add database precision as param
|
||||
uint8_t dbPrec = pFunc->node.resType.precision;
|
||||
|
||||
if (3 == numOfParams) {
|
||||
if (3 == numOfParams && !IS_NULL_TYPE(para2Type)) {
|
||||
int32_t code = validateTimeUnitParam(dbPrec, (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2));
|
||||
if (code == TSDB_CODE_FUNC_TIME_UNIT_TOO_SMALL) {
|
||||
return buildFuncErrMsg(pErrBuf, len, code,
|
||||
|
|
|
@ -1051,7 +1051,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
} else {
|
||||
// todo this should replace the existed object put by replay creating stream task msg from mnode
|
||||
stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
|
||||
taosMemoryFree(pTask);
|
||||
tFreeStreamTask(pTask);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -131,21 +131,27 @@ static void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
|
|||
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
|
||||
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId,
|
||||
int8_t type) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
if (memSize <= 0) {
|
||||
memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
|
||||
}
|
||||
if (rowSize == 0) {
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SStreamFileState* pFileState = taosMemoryCalloc(1, sizeof(SStreamFileState));
|
||||
if (!pFileState) {
|
||||
goto _error;
|
||||
}
|
||||
QUERY_CHECK_NULL(pFileState, code, lino, _error, terrno);
|
||||
|
||||
rowSize += selectRowSize;
|
||||
pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
|
||||
pFileState->usedBuffs = tdListNew(POINTER_BYTES);
|
||||
QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _error, terrno);
|
||||
|
||||
pFileState->freeBuffs = tdListNew(POINTER_BYTES);
|
||||
QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _error, terrno);
|
||||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
int32_t cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
|
||||
if (type == STREAM_STATE_BUFF_HASH) {
|
||||
|
@ -171,10 +177,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
|
|||
pFileState->cfName = taosStrdup("sess");
|
||||
pFileState->stateFunctionGetFn = getSessionRowBuff;
|
||||
}
|
||||
|
||||
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) {
|
||||
goto _error;
|
||||
}
|
||||
QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _error, terrno);
|
||||
|
||||
pFileState->keyLen = keySize;
|
||||
pFileState->rowSize = rowSize;
|
||||
|
@ -188,6 +191,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
|
|||
pFileState->flushMark = INT64_MIN;
|
||||
pFileState->maxTs = INT64_MIN;
|
||||
pFileState->id = taosStrdup(taskId);
|
||||
QUERY_CHECK_NULL(pFileState->id, code, lino, _error, terrno);
|
||||
|
||||
// todo(liuyao) optimize
|
||||
if (type == STREAM_STATE_BUFF_HASH) {
|
||||
|
@ -198,8 +202,8 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
|
|||
|
||||
void* valBuf = NULL;
|
||||
int32_t len = 0;
|
||||
int32_t code = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
|
||||
if (tmpRes == TSDB_CODE_SUCCESS) {
|
||||
ASSERT(len == sizeof(TSKEY));
|
||||
streamFileStateDecode(&pFileState->flushMark, valBuf, len);
|
||||
qDebug("===stream===flushMark read:%" PRId64, pFileState->flushMark);
|
||||
|
@ -208,6 +212,9 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
|
|||
return pFileState;
|
||||
|
||||
_error:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
streamFileStateDestroy(pFileState);
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
#define ALLOW_FORBID_FUNC
|
||||
#include "os.h"
|
||||
#include "osSemaphore.h"
|
||||
#include "tdef.h"
|
||||
#include "zlib.h"
|
||||
|
||||
#ifdef WINDOWS
|
||||
|
@ -65,14 +66,14 @@ typedef struct TdFile {
|
|||
#define FILE_WITH_LOCK 1
|
||||
|
||||
#ifdef BUILD_WITH_RAND_ERR
|
||||
#define STUB_RAND_IO_ERR(ret) \
|
||||
if (tsEnableRandErr) { \
|
||||
uint32_t r = taosRand() % tsRandErrDivisor; \
|
||||
if ((r + 1) <= tsRandErrChance) { \
|
||||
errno = EIO; \
|
||||
terrno = TAOS_SYSTEM_ERROR(errno); \
|
||||
return (ret); \
|
||||
} \
|
||||
#define STUB_RAND_IO_ERR(ret) \
|
||||
if (tsEnableRandErr && (tsRandErrScope & RAND_ERR_FILE)) { \
|
||||
uint32_t r = taosRand() % tsRandErrDivisor; \
|
||||
if ((r + 1) <= tsRandErrChance) { \
|
||||
errno = EIO; \
|
||||
terrno = TAOS_SYSTEM_ERROR(errno); \
|
||||
return (ret); \
|
||||
} \
|
||||
}
|
||||
#else
|
||||
#define STUB_RAND_IO_ERR(ret)
|
||||
|
|
|
@ -20,9 +20,11 @@
|
|||
#include <malloc.h>
|
||||
#endif
|
||||
#include "os.h"
|
||||
#include "tdef.h"
|
||||
|
||||
int32_t tsRandErrChance = 1;
|
||||
int64_t tsRandErrDivisor = 10001;
|
||||
int64_t tsRandErrScope = (RAND_ERR_MEMORY | RAND_ERR_FILE);
|
||||
threadlocal bool tsEnableRandErr = 0;
|
||||
|
||||
#if defined(USE_TD_MEMORY) || defined(USE_ADDR2LINE)
|
||||
|
@ -272,7 +274,7 @@ void *taosMemoryMalloc(int64_t size) {
|
|||
#else
|
||||
|
||||
#ifdef BUILD_WITH_RAND_ERR
|
||||
if (tsEnableRandErr) {
|
||||
if (tsEnableRandErr && (tsRandErrScope & RAND_ERR_MEMORY)) {
|
||||
uint32_t r = taosRand() % tsRandErrDivisor;
|
||||
if ((r + 1) <= tsRandErrChance) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -302,7 +304,7 @@ void *taosMemoryCalloc(int64_t num, int64_t size) {
|
|||
return (char *)tmp + sizeof(TdMemoryInfo);
|
||||
#else
|
||||
#ifdef BUILD_WITH_RAND_ERR
|
||||
if (tsEnableRandErr) {
|
||||
if (tsEnableRandErr && (tsRandErrScope & RAND_ERR_MEMORY)) {
|
||||
uint32_t r = taosRand() % tsRandErrDivisor;
|
||||
if ((r + 1) <= tsRandErrChance) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -342,7 +344,7 @@ void *taosMemoryRealloc(void *ptr, int64_t size) {
|
|||
return (char *)tmp + sizeof(TdMemoryInfo);
|
||||
#else
|
||||
#ifdef BUILD_WITH_RAND_ERR
|
||||
if (tsEnableRandErr) {
|
||||
if (tsEnableRandErr && (tsRandErrScope & RAND_ERR_MEMORY)) {
|
||||
uint32_t r = taosRand() % tsRandErrDivisor;
|
||||
if ((r + 1) <= tsRandErrChance) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -377,7 +379,7 @@ char *taosStrdup(const char *ptr) {
|
|||
return (char *)tmp + sizeof(TdMemoryInfo);
|
||||
#else
|
||||
#ifdef BUILD_WITH_RAND_ERR
|
||||
if (tsEnableRandErr) {
|
||||
if (tsEnableRandErr && (tsRandErrScope & RAND_ERR_MEMORY)) {
|
||||
uint32_t r = taosRand() % tsRandErrDivisor;
|
||||
if ((r + 1) <= tsRandErrChance) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -443,7 +445,7 @@ void *taosMemoryMallocAlign(uint32_t alignment, int64_t size) {
|
|||
#else
|
||||
#if defined(LINUX)
|
||||
#ifdef BUILD_WITH_RAND_ERR
|
||||
if (tsEnableRandErr) {
|
||||
if (tsEnableRandErr && (tsRandErrScope & RAND_ERR_MEMORY)) {
|
||||
uint32_t r = taosRand() % tsRandErrDivisor;
|
||||
if ((r + 1) <= tsRandErrChance) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -19,6 +19,10 @@
|
|||
|
||||
#if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL)
|
||||
#include "cus_name.h"
|
||||
#else
|
||||
#ifndef CUS_PROMPT
|
||||
#define CUS_PROMPT "taos"
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#define PROCESS_ITEM 12
|
||||
|
@ -987,7 +991,7 @@ void taosKillSystem() {
|
|||
exit(0);
|
||||
#else
|
||||
// SIGINT
|
||||
(void)printf("taosd will shut down soon");
|
||||
(void)printf("%sd will shut down soon", CUS_PROMPT);
|
||||
(void)kill(tsProcId, 2);
|
||||
#endif
|
||||
}
|
||||
|
|
|
@ -1909,6 +1909,8 @@ class TDCom:
|
|||
if latency < self.stream_timeout:
|
||||
latency += 1
|
||||
time.sleep(1)
|
||||
else:
|
||||
return False
|
||||
return tbname
|
||||
|
||||
def get_group_id_from_stb(self, stbname):
|
||||
|
|
|
@ -22,6 +22,10 @@
|
|||
|
||||
#if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL)
|
||||
#include "cus_name.h"
|
||||
#else
|
||||
#ifndef CUS_PROMPT
|
||||
#define CUS_PROMPT "taos"
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#define TAOS_CONSOLE_PROMPT_CONTINUE " -> "
|
||||
|
@ -57,7 +61,7 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg);
|
|||
|
||||
void shellPrintHelp() {
|
||||
char indent[] = " ";
|
||||
printf("Usage: taos [OPTION...] \r\n\r\n");
|
||||
printf("Usage: %s [OPTION...] \r\n\r\n", CUS_PROMPT);
|
||||
printf("%s%s%s%s\r\n", indent, "-a,", indent, SHELL_AUTH);
|
||||
printf("%s%s%s%s\r\n", indent, "-A,", indent, SHELL_GEN_AUTH);
|
||||
printf("%s%s%s%s\r\n", indent, "-B,", indent, SHELL_BI_MODE);
|
||||
|
@ -435,11 +439,11 @@ int32_t shellParseArgs(int32_t argc, char *argv[]) {
|
|||
shell.info.promptSize = strlen(shell.info.promptHeader);
|
||||
#ifdef TD_ENTERPRISE
|
||||
snprintf(shell.info.programVersion, sizeof(shell.info.programVersion),
|
||||
"%s\ntaos version: %s compatible_version: %s\ngit: %s\ngitOfInternal: %s\nbuild: %s", TD_PRODUCT_NAME,
|
||||
version, compatible_version, gitinfo, gitinfoOfInternal, buildinfo);
|
||||
"%s\n%s version: %s compatible_version: %s\ngit: %s\ngitOfInternal: %s\nbuild: %s", TD_PRODUCT_NAME,
|
||||
CUS_PROMPT, version, compatible_version, gitinfo, gitinfoOfInternal, buildinfo);
|
||||
#else
|
||||
snprintf(shell.info.programVersion, sizeof(shell.info.programVersion),
|
||||
"%s\ntaos version: %s compatible_version: %s\ngit: %s\nbuild: %s", TD_PRODUCT_NAME, version,
|
||||
"%s\n%s version: %s compatible_version: %s\ngit: %s\nbuild: %s", TD_PRODUCT_NAME, CUS_PROMPT, version,
|
||||
compatible_version, gitinfo, buildinfo);
|
||||
#endif
|
||||
|
||||
|
|
Loading…
Reference in New Issue