merge main

This commit is contained in:
Xiaoyu Wang 2023-01-28 09:24:30 +08:00
commit 83a711c37f
57 changed files with 631 additions and 290 deletions

View File

@ -430,7 +430,7 @@ pipeline {
date
rm -rf ${WKC}/debug
cd ${WKC}/tests/parallel_test
time ./container_build.sh -w ${WKDIR} -t 10 -e
time ./container_build.sh -w ${WKDIR} -e
'''
def extra_param = ""
def log_server_file = "/home/log_server.json"

View File

@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "3.0.2.2")
SET(TD_VER_NUMBER "3.0.2.4")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG f80dd7e
GIT_TAG 7d24ed5
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -907,6 +907,7 @@ typedef struct {
int32_t numOfRetensions;
SArray* pRetensions;
int8_t schemaless;
int16_t sstTrigger;
} SDbCfgRsp;
int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp);

View File

@ -193,7 +193,7 @@ typedef struct SSyncLogStore {
SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore);
int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forcSync);
int32_t (*syncLogGetEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
int32_t (*syncLogTruncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);

View File

@ -743,6 +743,34 @@ function is_version_compatible() {
esac
}
deb_erase() {
confirm=""
while [ "" == "${confirm}" ]; do
echo -e -n "${RED}Exist tdengine deb detected, do you want to remove it? [yes|no] ${NC}:"
read confirm
if [ "yes" == "$confirm" ]; then
${csudo}dpkg --remove tdengine ||:
break
elif [ "no" == "$confirm" ]; then
break
fi
done
}
rpm_erase() {
confirm=""
while [ "" == "${confirm}" ]; do
echo -e -n "${RED}Exist tdengine rpm detected, do you want to remove it? [yes|no] ${NC}:"
read confirm
if [ "yes" == "$confirm" ]; then
${csudo}rpm -e tdengine ||:
break
elif [ "no" == "$confirm" ]; then
break
fi
done
}
function updateProduct() {
# Check if version compatible
if ! is_version_compatible; then
@ -755,6 +783,13 @@ function updateProduct() {
echo "File ${tarName} does not exist"
exit 1
fi
if echo $osinfo | grep -qwi "centos"; then
rpm -q tdengine 2>&1 > /dev/null && rpm_erase tdengine ||:
elif echo $osinfo | grep -qwi "ubuntu"; then
dpkg -l tdengine 2>&1 > /dev/null && deb_erase tdengine ||:
fi
tar -zxf ${tarName}
install_jemalloc

View File

@ -357,6 +357,7 @@ void doDestroyRequest(void *p) {
taosMemoryFreeClear(pRequest->pDb);
doFreeReqResultInfo(&pRequest->body.resInfo);
tsem_destroy(&pRequest->body.rspSem);
taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->dbList);
@ -371,6 +372,9 @@ void doDestroyRequest(void *p) {
}
if (pRequest->syncQuery) {
if (pRequest->body.param){
tsem_destroy(&((SSyncQueryParam*)pRequest->body.param)->sem);
}
taosMemoryFree(pRequest->body.param);
}
@ -388,45 +392,6 @@ void destroyRequest(SRequestObj *pRequest) {
removeRequest(pRequest->self);
}
void taosClientCrash(int signum, void *sigInfo, void *context) {
taosIgnSignal(SIGTERM);
taosIgnSignal(SIGHUP);
taosIgnSignal(SIGINT);
taosIgnSignal(SIGBREAK);
#if !defined(WINDOWS)
taosIgnSignal(SIGBUS);
#endif
taosIgnSignal(SIGABRT);
taosIgnSignal(SIGFPE);
taosIgnSignal(SIGSEGV);
char *pMsg = NULL;
const char *flags = "UTL FATAL ";
ELogLevel level = DEBUG_FATAL;
int32_t dflag = 255;
int64_t msgLen= -1;
if (tsEnableCrashReport) {
if (taosGenCrashJsonMsg(signum, &pMsg, lastClusterId, appInfo.startTime)) {
taosPrintLog(flags, level, dflag, "failed to generate crash json msg");
goto _return;
} else {
msgLen = strlen(pMsg);
}
}
_return:
taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo);
#ifdef _TD_DARWIN_64
exit(signum);
#elif defined(WINDOWS)
exit(signum);
#endif
}
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }
static void *tscCrashReportThreadFp(void *param) {
@ -523,15 +488,26 @@ void tscStopCrashReport() {
}
}
static void tscSetSignalHandle() {
#if !defined(WINDOWS)
taosSetSignal(SIGBUS, taosClientCrash);
#endif
taosSetSignal(SIGABRT, taosClientCrash);
taosSetSignal(SIGFPE, taosClientCrash);
taosSetSignal(SIGSEGV, taosClientCrash);
void tscWriteCrashInfo(int signum, void *sigInfo, void *context) {
char *pMsg = NULL;
const char *flags = "UTL FATAL ";
ELogLevel level = DEBUG_FATAL;
int32_t dflag = 255;
int64_t msgLen= -1;
if (tsEnableCrashReport) {
if (taosGenCrashJsonMsg(signum, &pMsg, lastClusterId, appInfo.startTime)) {
taosPrintLog(flags, level, dflag, "failed to generate crash json msg");
} else {
msgLen = strlen(pMsg);
}
}
taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo);
}
void taos_init_imp(void) {
// In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
@ -555,8 +531,6 @@ void taos_init_imp(void) {
return;
}
tscSetSignalHandle();
initQueryModuleMsgHandle();
if (taosConvInit() != 0) {

View File

@ -159,6 +159,12 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType);
}
void freeQueryParam(SSyncQueryParam* param) {
if (param == NULL) return;
tsem_destroy(&param->sem);
taosMemoryFree(param);
}
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
SRequestObj** pRequest, int64_t reqid) {
*pRequest = createRequest(connId, TSDB_SQL_SELECT, reqid);
@ -180,17 +186,18 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
(*pRequest)->sqlLen = sqlLen;
(*pRequest)->validateOnly = validateSql;
SSyncQueryParam* newpParam;
if (param == NULL) {
SSyncQueryParam* pParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
if (pParam == NULL) {
newpParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
if (newpParam == NULL) {
destroyRequest(*pRequest);
*pRequest = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
tsem_init(&pParam->sem, 0, 0);
pParam->pRequest = (*pRequest);
param = pParam;
tsem_init(&newpParam->sem, 0, 0);
newpParam->pRequest = (*pRequest);
param = newpParam;
}
(*pRequest)->body.param = param;
@ -201,8 +208,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
if (err) {
tscError("%" PRId64 " failed to add to request container, reqId:0x%" PRIx64 ", conn:%" PRId64 ", %s",
(*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
taosMemoryFree(param);
freeQueryParam(newpParam);
destroyRequest(*pRequest);
*pRequest = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
@ -214,6 +220,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
tscError("%" PRId64 " failed to create node allocator, reqId:0x%" PRIx64 ", conn:%" PRId64 ", %s",
(*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
freeQueryParam(newpParam);
destroyRequest(*pRequest);
*pRequest = NULL;
return TSDB_CODE_OUT_OF_MEMORY;

View File

@ -509,9 +509,8 @@ void taos_stop_query(TAOS_RES *res) {
SRequestObj *pRequest = (SRequestObj *)res;
pRequest->killed = true;
int32_t numOfFields = taos_num_fields(pRequest);
// It is not a query, no need to stop.
if (numOfFields == 0) {
if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) {
tscDebug("request 0x%" PRIx64 " no need to be killed since not query", pRequest->requestId);
return;
}

View File

@ -1448,6 +1448,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
end:
taosMemoryFreeClear(pTableMeta);
qDestroyQuery(pQuery);
destroyRequest(pRequest);
taosMemoryFree(subReq);
return code;
}
@ -1639,6 +1640,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
end:
taosMemoryFreeClear(pTableMeta);
qDestroyQuery(pQuery);
destroyRequest(pRequest);
taosMemoryFree(subReq);
return code;
}

View File

@ -301,12 +301,7 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
continue;
}
if (STMT_TYPE_MULTI_INSERT == pStmt->sql.type) {
qFreeStmtDataBlock(pBlocks);
} else {
qDestroyStmtDataBlock(pBlocks);
}
qDestroyStmtDataBlock(pBlocks);
taosHashRemove(pStmt->exec.pBlockHash, key, keyLen);
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);

View File

@ -2821,8 +2821,8 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1;
}
if (tEncodeI8(&encoder, pRsp->schemaless) < 0) return -1;
if (tEncodeI16(&encoder, pRsp->sstTrigger) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
@ -2873,6 +2873,7 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
}
}
if (tDecodeI8(&decoder, &pRsp->schemaless) < 0) return -1;
if (tDecodeI16(&decoder, &pRsp->sstTrigger) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);

View File

@ -889,7 +889,7 @@ static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq) {
cfgRsp.numOfRetensions = pDb->cfg.numOfRetensions;
cfgRsp.pRetensions = pDb->cfg.pRetensions;
cfgRsp.schemaless = pDb->cfg.schemaless;
cfgRsp.sstTrigger = pDb->cfg.sstTrigger;
int32_t contLen = tSerializeSDbCfgRsp(NULL, 0, &cfgRsp);
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {

View File

@ -153,6 +153,8 @@ typedef struct SMTbCursor SMTbCursor;
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
int32_t metaTbCursorNext(SMTbCursor *pTbCur);
int32_t metaTbCursorPrev(SMTbCursor *pTbCur);
#endif
// tsdb

View File

@ -311,7 +311,7 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) {
}
}
int metaTbCursorNext(SMTbCursor *pTbCur) {
int32_t metaTbCursorNext(SMTbCursor *pTbCur) {
int ret;
void *pBuf;
STbCfg tbCfg;
@ -335,6 +335,31 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
return 0;
}
int32_t metaTbCursorPrev(SMTbCursor *pTbCur) {
int ret;
void *pBuf;
STbCfg tbCfg;
for (;;) {
ret = tdbTbcPrev(pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen);
if (ret < 0) {
return -1;
}
tDecoderClear(&pTbCur->mr.coder);
metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
continue;
}
break;
}
return 0;
}
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
void *pData = NULL;
int nData = 0;

View File

@ -164,7 +164,7 @@ void* tsdbCacherowsReaderClose(void* pReader) {
destroyLastBlockLoadInfo(p->pLoadInfo);
taosMemoryFree((void*) p->idstr);
taosMemoryFree((void*)p->idstr);
taosMemoryFree(pReader);
return NULL;
}
@ -241,7 +241,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
taosArrayPush(pLastCols, &p);
}
tsdbTakeReadSnap(pr->pVnode->pTsdb, &pr->pReadSnap, "cache-l");
code = tsdbTakeReadSnap(pr->pVnode->pTsdb, &pr->pReadSnap, "cache-l");
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
pr->pDataFReader = NULL;
pr->pDataFReaderLast = NULL;
@ -252,7 +256,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
if (code != TSDB_CODE_SUCCESS) {
return code;
goto _end;
}
if (h == NULL) {
@ -321,7 +325,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
if (code != TSDB_CODE_SUCCESS) {
return code;
goto _end;
}
if (h == NULL) {

View File

@ -458,9 +458,8 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe
taosMemoryFree(pHeadF);
}
} else {
nRef = pHeadF->nRef;
*pHeadF = *pSetNew->pHeadF;
pHeadF->nRef = nRef;
ASSERT(pHeadF->offset == pSetNew->pHeadF->offset);
ASSERT(pHeadF->size == pSetNew->pHeadF->size);
}
// data
@ -481,9 +480,7 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe
taosMemoryFree(pDataF);
}
} else {
nRef = pDataF->nRef;
*pDataF = *pSetNew->pDataF;
pDataF->nRef = nRef;
pDataF->size = pSetNew->pDataF->size;
}
// sma
@ -504,9 +501,7 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe
taosMemoryFree(pSmaF);
}
} else {
nRef = pSmaF->nRef;
*pSmaF = *pSetNew->pSmaF;
pSmaF->nRef = nRef;
pSmaF->size = pSetNew->pSmaF->size;
}
// stt

View File

@ -1758,11 +1758,14 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
if (minKey == k.ts) {
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
if (pSchema == NULL) {
return terrno;
}
if (init) {
tRowMerge(&merge, pRow);
tRowMergerAdd(&merge, pRow, pSchema);
} else {
init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
int32_t code = tRowMergerInit(&merge, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -3884,7 +3887,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
code = doOpenReaderImpl(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
goto _err;
}
} else {
STsdbReader* pPrevReader = pReader->innerReader[0];
@ -3905,7 +3908,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
code = doOpenReaderImpl(pPrevReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
goto _err;
}
}
}

View File

@ -731,6 +731,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
if (key.version > pMerger->version) {
#if 0
if (!COL_VAL_IS_NONE(pColVal)) {
if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
SColVal *tColVal = taosArrayGet(pMerger->pArray, iCol);
@ -746,6 +747,28 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
taosArraySet(pMerger->pArray, iCol, pColVal);
}
}
#endif
if (!COL_VAL_IS_NONE(pColVal)) {
if (IS_VAR_DATA_TYPE(pColVal->type)) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (!COL_VAL_IS_NULL(pColVal)) {
code = tRealloc(&pTColVal->value.pData, pColVal->value.nData);
if (code) return code;
pTColVal->value.nData = pColVal->value.nData;
if (pTColVal->value.nData) {
memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
}
pTColVal->flag = 0;
} else {
tFree(pTColVal->value.pData);
pTColVal->value.pData = NULL;
taosArraySet(pMerger->pArray, iCol, pColVal);
}
} else {
taosArraySet(pMerger->pArray, iCol, pColVal);
}
}
} else if (key.version < pMerger->version) {
SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {

View File

@ -805,6 +805,7 @@ int32_t ctgMakeVgArray(SDBVgInfo* dbInfo);
int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb);
int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName);
void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache);
void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache);
extern SCatalogMgmt gCtgMgmt;
extern SCtgDebug gCTGDebug;

View File

@ -598,10 +598,16 @@ int32_t ctgGetCachedTbVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInf
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, pVgroup));
ctgRUnlockVgInfo(dbCache);
SCtgTbMetaCtx ctx = {0};
ctx.pName = (SName*)pTableName;
ctx.flag = CTG_FLAG_UNKNOWN_STB;
CTG_ERR_JRET(ctgCopyTbMeta(pCtg, &ctx, &dbCache, &tbCache, pTableMeta, db));
code = ctgCopyTbMeta(pCtg, &ctx, &dbCache, &tbCache, pTableMeta, db);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
CTG_RET(code);
_return:

View File

@ -999,6 +999,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
ctgReleaseVgInfoToCache(pCtg, dbCache);
dbCache = NULL;
} else {
SBuildUseDBInput input = {0};
@ -1168,6 +1169,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
ctgReleaseVgInfoToCache(pCtg, dbCache);
dbCache = NULL;
} else {
SBuildUseDBInput input = {0};

View File

@ -2118,7 +2118,7 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
_return:
if (dbCache) {
if (code == TSDB_CODE_SUCCESS && dbCache) {
ctgWUnlockVgInfo(dbCache);
}

View File

@ -264,10 +264,10 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, S
len += sprintf(
buf2 + VARSTR_HEADER_SIZE,
"CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm "
"WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
"WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d STT_TRIGGER %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
"WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d",
dbFName, pCfg->buffer, pCfg->cacheSize, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile,
pCfg->walFsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2,
pCfg->walFsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->sstTrigger, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2,
pCfg->pages, pCfg->pageSize, prec, pCfg->replications, pCfg->walLevel, pCfg->numOfVgroups,
1 == pCfg->numOfStables);

View File

@ -705,7 +705,8 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator);
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo);
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator);
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);

View File

@ -584,7 +584,13 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
int32_t index = 0;
int32_t code = 0;
while (index++ < pRetrieveRsp->numOfBlocks) {
SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
SSDataBlock* pb = NULL;
if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
blockDataCleanup(pb);
} else {
pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
}
code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
if (code != 0) {
@ -732,9 +738,7 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
}
// reset the value for a new group data
pLimitInfo->numOfOutputRows = 0;
pLimitInfo->remainOffset = pLimitInfo->limit.offset;
resetLimitInfoForNextGroup(pLimitInfo);
// existing rows that belongs to previous group.
if (pBlock->info.rows > 0) {
return PROJECT_RETRIEVE_DONE;
@ -760,7 +764,12 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
blockDataKeepFirstNRows(pBlock, keepRows);
if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
pOperator->status = OP_EXEC_DONE;
setOperatorCompleted(pOperator);
} else {
// current group limitation is reached, and future blocks of this group need to be discarded.
if (pBlock->info.rows == 0) {
return PROJECT_RETRIEVE_CONTINUE;
}
}
return PROJECT_RETRIEVE_DONE;

View File

@ -954,7 +954,7 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray*
return -1;
}
} else {
qWarn("failed to get tableIds from by table name: %s, reason: %s", name, tstrerror(terrno));
// qWarn("failed to get tableIds from by table name: %s, reason: %s", name, tstrerror(terrno));
terrno = 0;
}
}
@ -1759,6 +1759,11 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit
pLimitInfo->remainGroupOffset = slimit.offset;
}
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo) {
pLimitInfo->numOfOutputRows = 0;
pLimitInfo->remainOffset = pLimitInfo->limit.offset;
}
uint64_t tableListGetSize(const STableListInfo* pTableList) {
ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map));
return taosArrayGetSize(pTableList->pTableList);

View File

@ -24,12 +24,16 @@
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t exchangeObjRefPool = -1;
static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); }
static void cleanupRefPool() {
int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
taosCloseRef(ref);
}
static void initRefPool() {
exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo);
atexit(cleanupRefPool);
}
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
@ -442,7 +446,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
taosThreadOnce(&initPoolOnce, initRefPool);
atexit(cleanupRefPool);
qDebug("start to create subplan task, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);

View File

@ -593,8 +593,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, &pageId);
taosArrayPush(p->pPageList, &pageId);
if (pPage == NULL) {
return pPage;
}
taosArrayPush(p->pPageList, &pageId);
*(int32_t*)pPage = 0;
} else {
int32_t* curId = taosArrayGetLast(p->pPageList);
@ -612,6 +615,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
// add a new page for current group
int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, &pageId);
if (pPage == NULL) {
qError("failed to get new buffer, code:%s", tstrerror(terrno));
return NULL;
}
taosArrayPush(p->pPageList, &pageId);
memset(pPage, 0, getBufPageSize(pInfo->pBuf));
}

View File

@ -175,8 +175,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
// reset the value for a new group data
// existing rows that belongs to previous group.
pLimitInfo->numOfOutputRows = 0;
pLimitInfo->remainOffset = pLimitInfo->limit.offset;
resetLimitInfoForNextGroup(pLimitInfo);
}
return PROJECT_RETRIEVE_DONE;
@ -200,10 +199,18 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
blockDataKeepFirstNRows(pBlock, keepRows);
// TODO: optimize it later when partition by + limit
// all retrieved requirement has been fulfilled, let's finish this
if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) ||
(pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
setOperatorCompleted(pOperator);
} else {
// Even current group is done, there may be many vgroups remain existed, and we need to continue to retrieve data
// from next group. So let's continue this retrieve process
if (keepRows == 0) {
return PROJECT_RETRIEVE_CONTINUE;
}
}
}
@ -357,7 +364,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
}
// printDataBlock1(p, "project");
return (p->info.rows > 0) ? p : NULL;
}

View File

@ -257,7 +257,7 @@ static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlo
}
// todo handle the slimit info
void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
SLimit* pLimit = &pLimitInfo->limit;
const char* id = GET_TASKID(pTaskInfo);
@ -266,6 +266,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
pLimitInfo->remainOffset -= pBlock->info.rows;
blockDataEmpty(pBlock);
qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
return false;
} else {
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
pLimitInfo->remainOffset = 0;
@ -274,13 +275,14 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
// limit the output rows
int32_t overflowRows = pLimitInfo->numOfOutputRows + pBlock->info.rows - pLimit->limit;
int32_t keep = pBlock->info.rows - overflowRows;
int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
blockDataKeepFirstNRows(pBlock, keep);
qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
pOperator->status = OP_EXEC_DONE;
return true;
}
return false;
}
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
@ -391,7 +393,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
}
}
applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator);
bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator);
if (limitReached) { // set operator flag is done
setOperatorCompleted(pOperator);
}
pCost->totalRows += pBlock->info.rows;
pTableScanInfo->limitInfo.numOfOutputRows = pCost->totalRows;
@ -768,8 +773,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// reset value for the next group data output
pOperator->status = OP_OPENED;
pInfo->base.limitInfo.numOfOutputRows = 0;
pInfo->base.limitInfo.remainOffset = pInfo->base.limitInfo.limit.offset;
resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
int32_t num = 0;
STableKeyInfo* pList = NULL;
@ -2685,9 +2689,12 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
taosArrayDestroy(pInfo->queryConds);
pInfo->queryConds = NULL;
resetLimitInfoForNextGroup(&pInfo->limitInfo);
return TSDB_CODE_SUCCESS;
}
// all data produced by this function only belongs to one group
// slimit/soffset does not need to be concerned here, since this function only deal with data within one group.
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
@ -2707,10 +2714,12 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
}
}
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows);
applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo, pOperator);
pInfo->limitInfo.numOfOutputRows += pResBlock->info.rows;
qDebug("%s get sorted row block, rows:%d, limit:%"PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
pInfo->limitInfo.numOfOutputRows);
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
}
@ -2749,11 +2758,13 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
return pBlock;
} else {
// Data of this group are all dumped, let's try the next group
stopGroupTableMergeScan(pOperator);
if (pInfo->tableEndIndex >= tableListSize - 1) {
setOperatorCompleted(pOperator);
break;
}
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId;
startGroupTableMergeScan(pOperator);

View File

@ -680,11 +680,13 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
break;
}
bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo, pOperator);
if (limitReached) {
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
if (p->info.rows > 0) {
applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo, pOperator);
if (p->info.rows > 0) {
break;
}
break;
}
}
@ -698,7 +700,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
}
pInfo->limitInfo.numOfOutputRows += p->info.rows;
pDataBlock->info.rows = p->info.rows;
pDataBlock->info.id.groupId = pInfo->groupId;
pDataBlock->info.dataLoad = 1;

View File

@ -491,6 +491,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
}
bool blockFull = false;
while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
if (pInfo->pCur->mr.me.type != TSDB_CHILD_TABLE) {
continue;
@ -512,17 +513,24 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows, dataBlock);
if ((smrSuperTable.me.stbEntry.schemaTag.nCols + numOfRows) > pOperator->resultInfo.capacity) {
metaTbCursorPrev(pInfo->pCur);
blockFull = true;
} else {
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows, dataBlock);
}
metaReaderClear(&smrSuperTable);
if (numOfRows >= pOperator->resultInfo.capacity) {
if (blockFull || numOfRows >= pOperator->resultInfo.capacity) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
if (pInfo->pRes->info.rows > 0) {
break;
}
blockFull = false;
}
}

View File

@ -3061,14 +3061,12 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
if (pHandle->currentPage == -1) {
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
pPage->num = sizeof(SFilePage);
} else {
pPage = getBufPage(pHandle->pBuf, pHandle->currentPage);
if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
if (pPage->num + length > getBufPageSize(pHandle->pBuf)) {
@ -3076,7 +3074,6 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
releaseBufPage(pHandle->pBuf, pPage);
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
pPage->num = sizeof(SFilePage);
@ -3123,7 +3120,6 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf
if (pHandle->pBuf != NULL) {
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
memcpy(pPage->data + pPos->offset, pBuf, length);

View File

@ -43,8 +43,8 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
if (pg == NULL) {
return NULL;
}
memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes));
memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes));
offset += (int32_t)(pg->num * pMemBucket->bytes);
}
@ -109,7 +109,7 @@ int32_t findOnlyResult(tMemBucket *pMemBucket, double *result) {
int32_t *pageId = taosArrayGet(list, 0);
SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId);
if (pPage == NULL) {
return TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
ASSERT(pPage->num == 1);
@ -276,7 +276,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
return NULL;
}
int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", tsTempDir);
int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 1024, "1", tsTempDir);
if (ret != 0) {
tMemBucketDestroy(pBucket);
return NULL;
@ -388,7 +388,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId);
if (pSlot->info.data == NULL) {
return TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
pSlot->info.pageId = pageId;
taosArrayPush(pPageIdList, &pageId);
@ -482,8 +482,9 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction
// data in buffer and file are merged together to be processed.
SFilePage *buffer = loadDataFromFilePage(pMemBucket, i);
if (buffer == NULL) {
return TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
int32_t currentIdx = count - num;
char *thisVal = buffer->data + pMemBucket->bytes * currentIdx;
@ -520,7 +521,7 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction
int32_t *pageId = taosArrayGet(list, f);
SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId);
if (pg == NULL) {
return TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
int32_t code = tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num);

View File

@ -426,6 +426,27 @@ int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) {
pBlock->pTableMeta = pNewMeta;
}
if (pBlock->boundColumnInfo.boundColumns) {
int32_t size = pBlock->boundColumnInfo.numOfCols * sizeof(col_id_t);
void* tmp = taosMemoryMalloc(size);
memcpy(tmp, pBlock->boundColumnInfo.boundColumns, size);
pBlock->boundColumnInfo.boundColumns = tmp;
}
if (pBlock->boundColumnInfo.cols) {
int32_t size = pBlock->boundColumnInfo.numOfCols * sizeof(SBoundColumn);
void* tmp = taosMemoryMalloc(size);
memcpy(tmp, pBlock->boundColumnInfo.cols, size);
pBlock->boundColumnInfo.cols = tmp;
}
if (pBlock->boundColumnInfo.colIdxInfo) {
int32_t size = pBlock->boundColumnInfo.numOfBound * sizeof(SBoundIdxInfo);
void* tmp = taosMemoryMalloc(size);
memcpy(tmp, pBlock->boundColumnInfo.colIdxInfo, size);
pBlock->boundColumnInfo.colIdxInfo = tmp;
}
return qResetStmtDataBlock(*pDst, false);
}
@ -438,7 +459,7 @@ int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgI
STableDataBlocks* pBlock = (STableDataBlocks*)*pDst;
pBlock->pData = taosMemoryMalloc(pBlock->nAllocSize);
if (NULL == pBlock->pData) {
qFreeStmtDataBlock(pBlock);
qDestroyStmtDataBlock(pBlock);
return TSDB_CODE_OUT_OF_MEMORY;
}

View File

@ -1080,29 +1080,29 @@ static bool sortPriKeyOptMayBeOptimized(SLogicNode* pNode) {
return false;
}
SSortLogicNode* pSort = (SSortLogicNode*)pNode;
if (pSort->groupSort || !sortPriKeyOptIsPriKeyOrderBy(pSort->pSortKeys) || 1 != LIST_LENGTH(pSort->node.pChildren)) {
if (!sortPriKeyOptIsPriKeyOrderBy(pSort->pSortKeys) || 1 != LIST_LENGTH(pSort->node.pChildren)) {
return false;
}
return true;
}
static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool* pNotOptimize,
static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool groupSort, bool* pNotOptimize,
SNodeList** pSequencingNodes) {
switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN: {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
if (NULL != pScan->pGroupTags || TSDB_SYSTEM_TABLE == pScan->tableType) {
if ((!groupSort && NULL != pScan->pGroupTags) || TSDB_SYSTEM_TABLE == pScan->tableType) {
*pNotOptimize = true;
return TSDB_CODE_SUCCESS;
}
return nodesListMakeAppend(pSequencingNodes, (SNode*)pNode);
}
case QUERY_NODE_LOGIC_PLAN_JOIN: {
int32_t code = sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0),
int32_t code = sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), groupSort,
pNotOptimize, pSequencingNodes);
if (TSDB_CODE_SUCCESS == code) {
code = sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 1), pNotOptimize,
pSequencingNodes);
code = sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 1), groupSort,
pNotOptimize, pSequencingNodes);
}
return code;
}
@ -1121,13 +1121,13 @@ static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool* pNot
return TSDB_CODE_SUCCESS;
}
return sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize,
pSequencingNodes);
return sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), groupSort,
pNotOptimize, pSequencingNodes);
}
static int32_t sortPriKeyOptGetSequencingNodes(SLogicNode* pNode, SNodeList** pSequencingNodes) {
static int32_t sortPriKeyOptGetSequencingNodes(SLogicNode* pNode, bool groupSort, SNodeList** pSequencingNodes) {
bool notOptimize = false;
int32_t code = sortPriKeyOptGetSequencingNodesImpl(pNode, &notOptimize, pSequencingNodes);
int32_t code = sortPriKeyOptGetSequencingNodesImpl(pNode, groupSort, &notOptimize, pSequencingNodes);
if (TSDB_CODE_SUCCESS != code || notOptimize) {
NODES_CLEAR_LIST(*pSequencingNodes);
}
@ -1175,8 +1175,8 @@ static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicS
static int32_t sortPrimaryKeyOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SSortLogicNode* pSort) {
SNodeList* pSequencingNodes = NULL;
int32_t code =
sortPriKeyOptGetSequencingNodes((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0), &pSequencingNodes);
int32_t code = sortPriKeyOptGetSequencingNodes((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0),
pSort->groupSort, &pSequencingNodes);
if (TSDB_CODE_SUCCESS == code && NULL != pSequencingNodes) {
code = sortPriKeyOptApply(pCxt, pLogicSubplan, pSort, pSequencingNodes);
}

View File

@ -228,9 +228,14 @@ typedef struct SQWorkerMgmt {
case QW_PHASE_POST_FETCH: \
ctx->inFetch = 0; \
break; \
default: \
case QW_PHASE_PRE_QUERY: \
case QW_PHASE_POST_QUERY: \
case QW_PHASE_PRE_CQUERY: \
case QW_PHASE_POST_CQUERY: \
atomic_store_8(&(ctx)->phase, _value); \
break; \
default: \
break; \
} \
} while (0)

View File

@ -551,7 +551,9 @@ _return:
if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code);
QW_SET_PHASE(ctx, phase);
if (QW_PHASE_POST_CQUERY != phase) {
QW_SET_PHASE(ctx, phase);
}
QW_UNLOCK(QW_WRITE, &ctx->lock);
qwReleaseTaskCtx(mgmt, ctx);
@ -758,7 +760,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_LOCK(QW_WRITE, &ctx->lock);
if (qComplete || (queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code) {
// Note: query is not running anymore
QW_SET_PHASE(ctx, 0);
QW_SET_PHASE(ctx, QW_PHASE_POST_CQUERY);
QW_UNLOCK(QW_WRITE, &ctx->lock);
break;
}

View File

@ -357,7 +357,7 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
ASSERT(pAppendEntry->index == appendIndex);
// append
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry, false);
if (code != 0) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "ignore, append error, append-index:%" PRId64, appendIndex);
@ -398,7 +398,7 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
}
// append
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry, false);
if (code != 0) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "ignore, log not exist, append error, append-index:%" PRId64, appendIndex);

View File

@ -2477,7 +2477,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
LRUHandle* h = NULL;
if (ths->state == TAOS_SYNC_STATE_LEADER) {
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
if (code != 0) {
sError("append noop error");
return -1;
@ -2720,7 +2720,7 @@ int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRe
if (ths->state == TAOS_SYNC_STATE_LEADER) {
// append entry
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
if (code != 0) {
if (ths->replicaNum == 1) {
if (h) {

View File

@ -364,7 +364,11 @@ _out:
return ret;
}
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replicaNum) {
return (replicaNum > 1) && (pEntry->originalRpcType == TDMT_VND_COMMIT);
}
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
ASSERT(pEntry->index >= 0);
SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) {
@ -374,7 +378,8 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
lastVer = pLogStore->syncLogLastIndex(pLogStore);
ASSERT(pEntry->index == lastVer + 1);
if (pLogStore->syncLogAppendEntry(pLogStore, pEntry) < 0) {
bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum);
if (pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync) < 0) {
sError("failed to append sync log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index,
pEntry->term);
return -1;
@ -436,7 +441,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
(void)syncNodeReplicateWithoutLock(pNode);
// persist
if (syncLogStorePersist(pLogStore, pEntry) < 0) {
if (syncLogStorePersist(pLogStore, pNode, pEntry) < 0) {
sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(),
pEntry->index);
goto _out;

View File

@ -23,7 +23,7 @@
// public function
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex);
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forceSync);
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index);
static int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
@ -192,9 +192,7 @@ SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
return SYNC_TERM_INVALID;
}
static inline bool raftLogForceSync(SSyncRaftEntry* pEntry) { return (pEntry->originalRpcType == TDMT_VND_COMMIT); }
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forceSync) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
@ -221,7 +219,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
ASSERT(pEntry->index == index);
bool forceSync = raftLogForceSync(pEntry);
walFsync(pWal, forceSync);
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,

View File

@ -834,7 +834,11 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
uuid_generate(uuid);
// it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null
uuid_unparse_lower(uuid, buf);
memcpy(uid, buf, uidlen);
int n = snprintf(uid, uidlen, "%.*s", (int)sizeof(buf), buf); // though less performance, much safer
if (n >= uidlen) {
// target buffer is too small
return -1;
}
return 0;
#else
int len = 0;

View File

@ -20,7 +20,10 @@
// todo refactor API
SArray* taosArrayInit(size_t size, size_t elemSize) {
assert(elemSize > 0);
if (elemSize == 0) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
if (size < TARRAY_MIN_SIZE) {
size = TARRAY_MIN_SIZE;
@ -96,8 +99,6 @@ void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles) {
}
void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) {
assert(pArray);
size_t size = pArray->size;
if (size <= 1) {
return;
@ -136,8 +137,6 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)
}
void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) {
assert(pArray);
size_t size = pArray->size;
if (size <= 1) {
return;
@ -195,11 +194,10 @@ void* taosArrayReserve(SArray* pArray, int32_t num) {
}
void* taosArrayPop(SArray* pArray) {
assert(pArray != NULL);
if (pArray->size == 0) {
return NULL;
}
pArray->size -= 1;
return TARRAY_GET_ELEM(pArray, pArray->size);
}
@ -208,16 +206,21 @@ void* taosArrayGet(const SArray* pArray, size_t index) {
if (NULL == pArray) {
return NULL;
}
assert(index < pArray->size);
if (index >= pArray->size) {
uError("index is out of range, current:%"PRIzu" max:%d", index, pArray->capacity);
return NULL;
}
return TARRAY_GET_ELEM(pArray, index);
}
void* taosArrayGetP(const SArray* pArray, size_t index) {
assert(index < pArray->size);
void* d = TARRAY_GET_ELEM(pArray, index);
return *(void**)d;
void** p = taosArrayGet(pArray, index);
if (p == NULL) {
return NULL;
}
return *p;
}
void* taosArrayGetLast(const SArray* pArray) { return TARRAY_GET_ELEM(pArray, pArray->size - 1); }
@ -296,9 +299,12 @@ void taosArrayRemove(SArray* pArray, size_t index) {
}
SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize) {
assert(src != NULL && elemSize > 0);
SArray* pDst = taosArrayInit(size, elemSize);
if (elemSize <= 0) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
SArray* pDst = taosArrayInit(size, elemSize);
memcpy(pDst->pData, src, elemSize * size);
pDst->size = size;
@ -306,8 +312,6 @@ SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize) {
}
SArray* taosArrayDup(const SArray* pSrc, __array_item_dup_fn_t fn) {
assert(pSrc != NULL);
if (pSrc->size == 0) { // empty array list
return taosArrayInit(8, pSrc->elemSize);
}
@ -399,14 +403,10 @@ void taosArrayDestroyEx(SArray* pArray, FDelete fp) {
}
void taosArraySort(SArray* pArray, __compar_fn_t compar) {
ASSERT(pArray != NULL && compar != NULL);
taosSort(pArray->pData, pArray->size, pArray->elemSize, compar);
}
void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t comparFn, int32_t flags) {
assert(pArray != NULL && comparFn != NULL);
assert(key != NULL);
return taosbsearch(key, pArray->pData, pArray->size, pArray->elemSize, comparFn, flags);
}

View File

@ -921,7 +921,7 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1)
void taosStopCacheRefreshWorker(void) {
stopRefreshWorker = true;
TdThreadOnce tmp = PTHREAD_ONCE_INIT;
if (memcmp(&cacheRefreshWorker, &tmp, sizeof(TdThreadOnce)) != 0) taosThreadJoin(cacheRefreshWorker, NULL);
if (memcmp(&cacheThreadInit, &tmp, sizeof(TdThreadOnce)) != 0) taosThreadJoin(cacheRefreshWorker, NULL);
taosArrayDestroy(pCacheArrayList);
}

View File

@ -897,6 +897,7 @@ void taosLogCrashInfo(char* nodeType, char* pMsg, int64_t msgLen, int signum, vo
pFile = taosOpenFile(filepath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosPrintLog(flags, level, dflag, "failed to open file:%s since %s", filepath, terrstr());
goto _return;
}

View File

@ -5,7 +5,10 @@
#include "thash.h"
#include "tlog.h"
#define GET_DATA_PAYLOAD(_p) ((char*)(_p)->pData + POINTER_BYTES)
#define GET_PAYLOAD_DATA(_p) ((char*)(_p)->pData + POINTER_BYTES)
#define BUF_PAGE_IN_MEM(_p) ((_p)->pData != NULL)
#define CLEAR_BUF_PAGE_IN_MEM_FLAG(_p) ((_p)->pData = NULL)
#define HAS_DATA_IN_DISK(_p) ((_p)->offset >= 0)
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)
typedef struct SPageDiskInfo {
@ -14,7 +17,7 @@ typedef struct SPageDiskInfo {
} SPageDiskInfo, SFreeListItem;
struct SPageInfo {
SListNode* pn; // point to list node struct
SListNode* pn; // point to list node struct. it is NULL when the page is evicted from the in-memory buffer
void* pData;
int64_t offset;
int32_t pageId;
@ -89,7 +92,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskba
return data;
}
static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) {
static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) {
if (pBuf->pFree == NULL) {
return pBuf->nextPos;
} else {
@ -112,10 +115,6 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) {
}
}
static void setPageNotInBuf(SPageInfo* pPageInfo) { pPageInfo->pData = NULL; }
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); }
/**
* +--------------------------+-------------------+--------------+
* | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes|
@ -124,23 +123,31 @@ static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize
* @param pg
* @return
*/
static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
ASSERT(!pg->used && pg->pData != NULL);
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); }
static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
if (pg->pData == NULL || pg->used) {
uError("invalid params in paged buffer process when flushing buf to disk, %s", pBuf->id);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
int32_t size = pBuf->pageSize;
char* t = NULL;
if (pg->offset == -1 || pg->dirty) {
void* payload = GET_DATA_PAYLOAD(pg);
if ((!HAS_DATA_IN_DISK(pg)) || pg->dirty) {
void* payload = GET_PAYLOAD_DATA(pg);
t = doCompressData(payload, pBuf->pageSize, &size, pBuf);
ASSERTS(size >= 0, "size is negative");
if (size < 0) {
uError("failed to compress data when flushing data to disk, %s", pBuf->id);
return NULL;
}
}
// this page is flushed to disk for the first time
if (pg->dirty) {
if (pg->offset == -1) {
ASSERTS(pg->dirty == true, "pg->dirty is false");
pg->offset = allocatePositionInFile(pBuf, size);
if (!HAS_DATA_IN_DISK(pg)) {
pg->offset = allocateNewPositionInFile(pBuf, size);
pBuf->nextPos += size;
int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
@ -155,6 +162,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return NULL;
}
// extend the file size
if (pBuf->fileSize < pg->offset + size) {
pBuf->fileSize = pg->offset + size;
}
@ -169,7 +177,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
taosArrayPush(pBuf->pFree, &dinfo);
// 2. allocate new position, and update the info
pg->offset = allocatePositionInFile(pBuf, size);
pg->offset = allocateNewPositionInFile(pBuf, size);
pBuf->nextPos += size;
}
@ -197,20 +205,19 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
size = pg->length;
}
ASSERT(size > 0 || (pg->offset == -1 && pg->length == -1));
char* pDataBuf = pg->pData;
memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize));
#ifdef BUF_PAGE_DEBUG
uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, pg->offset);
#endif
pg->length = size; // on disk size
return pDataBuf;
}
static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
static char* flushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
int32_t ret = TSDB_CODE_SUCCESS;
ASSERT(((int64_t)pBuf->numOfPages * pBuf->pageSize) == pBuf->totalBufSize && pBuf->numOfPages >= pBuf->inMemPages);
if (pBuf->pFile == NULL) {
if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) {
@ -219,22 +226,27 @@ static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
}
}
char* p = doFlushPageToDisk(pBuf, pg);
setPageNotInBuf(pg);
pg->dirty = false;
char* p = doFlushBufPage(pBuf, pg);
CLEAR_BUF_PAGE_IN_MEM_FLAG(pg);
pg->dirty = false;
return p;
}
// load file block data in disk
static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
if (pg->offset < 0 || pg->length <= 0) {
uError("failed to load buf page from disk, offset:%"PRId64", length:%d, %s", pg->offset, pg->length, pBuf->id);
return TSDB_CODE_INVALID_PARA;
}
int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
if (ret == -1) {
ret = TAOS_SYSTEM_ERROR(errno);
return ret;
}
void* pPage = (void*)GET_DATA_PAYLOAD(pg);
void* pPage = (void*)GET_PAYLOAD_DATA(pg);
ret = (int32_t)taosReadFile(pBuf->pFile, pPage, pg->length);
if (ret != pg->length) {
ret = TAOS_SYSTEM_ERROR(errno);
@ -249,10 +261,14 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return 0;
}
static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t pageId) {
static SPageInfo* registerNewPageInfo(SDiskbasedBuf* pBuf, int32_t pageId) {
pBuf->numOfPages += 1;
SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo));
if (ppi == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
ppi->pageId = pageId;
ppi->pData = NULL;
@ -272,46 +288,33 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
SListNode* pn = NULL;
while ((pn = tdListNext(&iter)) != NULL) {
SPageInfo* pageInfo = *(SPageInfo**)pn->data;
ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn);
SPageInfo* p = *(SPageInfo**)(pageInfo->pData);
ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn && p == pageInfo);
if (!pageInfo->used) {
// printf("%d is chosen\n", pageInfo->pageId);
break;
} else {
// printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty);
}
}
return pn;
}
static char* evacOneDataPage(SDiskbasedBuf* pBuf) {
char* bufPage = NULL;
static char* evictBufPage(SDiskbasedBuf* pBuf) {
SListNode* pn = getEldestUnrefedPage(pBuf);
terrno = 0;
// all pages are referenced by user, try to allocate new space
if (pn == NULL) {
int32_t prev = pBuf->inMemPages;
// increase by 50% of previous mem pages
pBuf->inMemPages = (int32_t)(pBuf->inMemPages * 1.5f);
// qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pBuf, prev,
// pBuf->inMemPages, pBuf->pageSize);
} else {
tdListPopNode(pBuf->lruList, pn);
SPageInfo* d = *(SPageInfo**)pn->data;
ASSERTS(d->pn == pn, "d->pn not equal pn");
d->pn = NULL;
taosMemoryFreeClear(pn);
bufPage = flushPageToDisk(pBuf, d);
if (pn == NULL) { // no available buffer pages now, return.
return NULL;
}
return bufPage;
terrno = 0;
tdListPopNode(pBuf->lruList, pn);
SPageInfo* d = *(SPageInfo**)pn->data;
d->pn = NULL;
taosMemoryFreeClear(pn);
return flushBufPage(pBuf, d);
}
static void lruListPushFront(SList* pList, SPageInfo* pi) {
@ -338,13 +341,12 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
SDiskbasedBuf* pPBuf = *pBuf;
if (pPBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
pPBuf->pageSize = pagesize;
pPBuf->numOfPages = 0; // all pages are in buffer in the first place
pPBuf->totalBufSize = 0;
pPBuf->inMemPages = inMemBufSize / pagesize; // maximum allowed pages, it is a soft limit.
pPBuf->allocateId = -1;
pPBuf->pFile = NULL;
pPBuf->id = strdup(id);
@ -353,33 +355,69 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
pPBuf->freePgList = tdListNew(POINTER_BYTES);
// at least more than 2 pages must be in memory
ASSERT(inMemBufSize >= pagesize * 2);
if (inMemBufSize < pagesize * 2) {
inMemBufSize = pagesize * 2;
}
pPBuf->inMemPages = inMemBufSize / pagesize; // maximum allowed pages, it is a soft limit.
pPBuf->lruList = tdListNew(POINTER_BYTES);
if (pPBuf->lruList == NULL) {
goto _error;
}
// init id hash table
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES);
if (pPBuf->pIdList == NULL) {
goto _error;
}
pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2); // EXTRA BYTES
pPBuf->all = taosHashInit(10, fn, true, false);
pPBuf->prefix = (char*) dir;
if (pPBuf->assistBuf == NULL) {
goto _error;
}
pPBuf->all = taosHashInit(10, fn, true, false);
if (pPBuf->all == NULL) {
goto _error;
}
pPBuf->prefix = (char*) dir;
pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
// qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
// pPBuf->pageSize,
// pPBuf->inMemPages, pPBuf->path);
// pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path);
return TSDB_CODE_SUCCESS;
_error:
destroyDiskbasedBuf(pPBuf);
return TSDB_CODE_OUT_OF_MEMORY;
}
static char* doExtractPage(SDiskbasedBuf* pBuf) {
char* availablePage = NULL;
if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
availablePage = evictBufPage(pBuf);
if (availablePage == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
uWarn("no available buf pages, current:%d, max:%d", listNEles(pBuf->lruList), pBuf->inMemPages)
}
} else {
availablePage = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased.
if (availablePage == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
}
}
return availablePage;
}
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
pBuf->statis.getPages += 1;
char* availablePage = NULL;
if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
availablePage = evacOneDataPage(pBuf);
char* availablePage = doExtractPage(pBuf);
if (availablePage == NULL) {
return NULL;
}
SPageInfo* pi = NULL;
@ -394,7 +432,10 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
*pageId = (++pBuf->allocateId);
// register page id info
pi = registerPage(pBuf, *pageId);
pi = registerNewPageInfo(pBuf, *pageId);
if (pi == NULL) {
return NULL;
}
// add to hash map
taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
@ -402,63 +443,62 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
}
// add to LRU list
ASSERT(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0);
lruListPushFront(pBuf->lruList, pi);
// allocate buf
if (availablePage == NULL) {
pi->pData =
taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased.
} else {
pi->pData = availablePage;
}
pi->pData = availablePage;
((void**)pi->pData)[0] = pi;
#ifdef BUF_PAGE_DEBUG
uDebug("page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%" PRId64, pi->pData, pi->pageId, pi->offset);
#endif
return (void*)(GET_DATA_PAYLOAD(pi));
return (void*)(GET_PAYLOAD_DATA(pi));
}
void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
ASSERT(pBuf != NULL && id >= 0);
if (id < 0) {
terrno = TSDB_CODE_INVALID_PARA;
uError("invalid page id:%d, %s", id, pBuf->id);
return NULL;
}
pBuf->statis.getPages += 1;
SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t));
ASSERT(pi != NULL && *pi != NULL);
if (pi == NULL || *pi == NULL) {
uError("failed to locate the buffer page:%d, %s", id, pBuf->id);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
if ((*pi)->pData != NULL) { // it is in memory
if (BUF_PAGE_IN_MEM(*pi)) { // it is in memory
// no need to update the LRU list if only one page exists
if (pBuf->numOfPages == 1) {
(*pi)->used = true;
return (void*)(GET_DATA_PAYLOAD(*pi));
return (void*)(GET_PAYLOAD_DATA(*pi));
}
SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data);
ASSERT(*pInfo == *pi);
if (*pInfo != *pi) {
uError("inconsistently data in paged buffer, pInfo:%p, pi:%p, %s", *pInfo, *pi, pBuf->id);
return NULL;
}
lruListMoveToFront(pBuf->lruList, (*pi));
(*pi)->used = true;
#ifdef BUF_PAGE_DEBUG
uDebug("page_getBufPage1 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset);
#endif
return (void*)(GET_DATA_PAYLOAD(*pi));
return (void*)(GET_PAYLOAD_DATA(*pi));
} else { // not in memory
ASSERT((*pi)->pData == NULL && (*pi)->pn == NULL &&
ASSERT((!BUF_PAGE_IN_MEM(*pi)) && (*pi)->pn == NULL &&
(((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
char* availablePage = NULL;
if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
availablePage = evacOneDataPage(pBuf);
if (availablePage == NULL) {
return NULL;
}
}
(*pi)->pData = doExtractPage(pBuf);
if (availablePage == NULL) {
(*pi)->pData = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize));
} else {
(*pi)->pData = availablePage;
// failed to evict buffer page, return with error code.
if ((*pi)->pData == NULL) {
return NULL;
}
// set the ptr to the new SPageInfo
@ -468,23 +508,25 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
(*pi)->used = true;
// some data has been flushed to disk, and needs to be loaded into buffer again.
if ((*pi)->length > 0 && (*pi)->offset >= 0) {
if (HAS_DATA_IN_DISK(*pi)) {
int32_t code = loadPageFromDisk(pBuf, *pi);
if (code != 0) {
terrno = code;
return NULL;
}
}
#ifdef BUF_PAGE_DEBUG
uDebug("page_getBufPage2 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset);
#endif
return (void*)(GET_DATA_PAYLOAD(*pi));
return (void*)(GET_PAYLOAD_DATA(*pi));
}
}
void releaseBufPage(SDiskbasedBuf* pBuf, void* page) {
if (ASSERTS(pBuf != NULL && page != NULL, "pBuf or page is NULL")) {
if (page == NULL) {
return;
}
SPageInfo* ppi = getPageInfoFromPayload(page);
releaseBufPageInfo(pBuf, ppi);
}
@ -493,7 +535,13 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
#ifdef BUF_PAGE_DEBUG
uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%" PRId64, pi->pageId, pi->used, pi->offset);
#endif
if (ASSERTS(pi->pData != NULL, "pi->pData is NULL")) {
if (pi == NULL) {
return;
}
if (pi->pData == NULL) {
uError("pi->pData (page data) is null");
return;
}
@ -504,7 +552,6 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; }
SArray* getDataBufPagesIdList(SDiskbasedBuf* pBuf) {
ASSERT(pBuf != NULL);
return pBuf->pIdList;
}
@ -582,7 +629,6 @@ SPageInfo* getLastPageInfo(SArray* pList) {
}
int32_t getPageId(const SPageInfo* pPgInfo) {
ASSERT(pPgInfo != NULL);
return pPgInfo->pageId;
}

View File

@ -0,0 +1,82 @@
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import tdDnodes
from math import inf
class TDTestCase:
def caseDescription(self):
'''
case1<shenglian zhou>: [TD-11204]Difference improvement that can ignore negative
'''
return
def init(self, conn, logSql, replicaVer=1):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), False)
self._conn = conn
def restartTaosd(self, index=1, dbname="db"):
tdDnodes.stop(index)
tdDnodes.startWithoutSleep(index)
tdSql.execute(f"use scd")
def run(self):
print("running {}".format(__file__))
tdSql.execute("drop database if exists scd")
tdSql.execute("create database if not exists scd")
tdSql.execute('use scd')
tdSql.execute('create table stb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(10), c9 nchar(10), c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) TAGS(t1 int, t2 binary(10), t3 double);')
tdSql.execute("create table tb1 using stb1 tags(1,'1',1.0);")
tdSql.execute("create table tb2 using stb1 tags(2,'2',2.0);")
tdSql.execute("create table tb3 using stb1 tags(3,'3',3.0);")
tdSql.execute('create database scd2 stt_trigger 3;')
tdSql.execute('create database scd4 stt_trigger 13;')
tdSql.query('show create database scd;')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 'scd')
tdSql.checkData(0, 1, "CREATE DATABASE `scd` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 1 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0")
tdSql.query('show create database scd2;')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 'scd2')
tdSql.checkData(0, 1, "CREATE DATABASE `scd2` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 3 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0")
tdSql.query('show create database scd4')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 'scd4')
tdSql.checkData(0, 1, "CREATE DATABASE `scd4` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 13 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0")
self.restartTaosd(1, dbname='scd')
tdSql.query('show create database scd;')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 'scd')
tdSql.checkData(0, 1, "CREATE DATABASE `scd` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 1 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0")
tdSql.query('show create database scd2;')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 'scd2')
tdSql.checkData(0, 1, "CREATE DATABASE `scd2` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 3 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0")
tdSql.query('show create database scd4')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 'scd4')
tdSql.checkData(0, 1, "CREATE DATABASE `scd4` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 13 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0")
tdSql.execute('drop database scd')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -145,6 +145,7 @@
,,y,script,./test.sh -f tsim/parser/precision_ns.sim
,,y,script,./test.sh -f tsim/parser/projection_limit_offset.sim
,,y,script,./test.sh -f tsim/parser/regex.sim
,,y,script,./test.sh -f tsim/parser/regressiontest.sim
,,y,script,./test.sh -f tsim/parser/select_across_vnodes.sim
,,y,script,./test.sh -f tsim/parser/select_distinct_tag.sim
,,y,script,./test.sh -f tsim/parser/select_from_cache_disk.sim
@ -1051,6 +1052,7 @@
#develop test
,,n,develop-test,python3 ./test.py -f 2-query/table_count_scan.py
,,n,develop-test,python3 ./test.py -f 2-query/show_create_db.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/auto_create_table_json.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/custom_col_tag.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/default_json.py

View File

@ -37,9 +37,9 @@ if [ -z "$WORKDIR" ]; then
usage
exit 1
fi
if [ -z "$THREAD_COUNT" ]; then
THREAD_COUNT=1
fi
# if [ -z "$THREAD_COUNT" ]; then
# THREAD_COUNT=1
# fi
ulimit -c unlimited
@ -55,7 +55,7 @@ fi
date
docker run \
-v $REP_MOUNT_PARAM \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true;make -j $THREAD_COUNT || exit 1"
--rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true;make -j || exit 1"
if [[ -d ${WORKDIR}/debugNoSan ]] ;then
echo "delete ${WORKDIR}/debugNoSan"
@ -70,7 +70,7 @@ mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugNoSan
date
docker run \
-v $REP_MOUNT_PARAM \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug;make -j $THREAD_COUNT || exit 1 "
--rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug;make -j || exit 1 "
mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan

View File

@ -184,6 +184,10 @@ function run_thread() {
if [ $? -eq 0 ]; then
case_file=`echo "$case_cmd"|grep -o ".*\.py"|awk '{print $NF}'`
fi
echo "$case_cmd"|grep -q "^./pytest.sh"
if [ $? -eq 0 ]; then
case_file=`echo "$case_cmd"|grep -o ".*\.py"|awk '{print $NF}'`
fi
echo "$case_cmd"|grep -q "\.sim"
if [ $? -eq 0 ]; then
case_file=`echo "$case_cmd"|grep -o ".*\.sim"|awk '{print $NF}'`

View File

@ -2828,7 +2828,7 @@ void runAll(TAOS *taos) {
printf("%s Begin\n", gCaseCtrl.caseCatalog);
runCaseList(taos);
#if 0
#if 1
strcpy(gCaseCtrl.caseCatalog, "Micro DB precision Test");
printf("%s Begin\n", gCaseCtrl.caseCatalog);
gCaseCtrl.precision = TIME_PRECISION_MICRO;

View File

@ -86,4 +86,23 @@ if $data00 != @ins_tags@ then
return -1
endi
sql create stable stb(ts timestamp, f int) tags(t1 int, t2 int, t3 int, t4 int, t5 int);
$i = 0
$tbNum = 1000
$tbPrefix = stb_tb
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using stb tags( $i , $i , $i , $i , $i )
$i = $i + 1
endw
sql select tag_value from information_schema.ins_tags where stable_name='stb';
if $rows != 5000 then
print $rows
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -147,5 +147,6 @@ void shellRunSingleCommandWebsocketImp(char *command);
// shellMain.c
extern SShellObj shell;
extern void tscWriteCrashInfo(int signum, void *sigInfo, void *context);
#endif /*_TD_SHELL_INT_H_*/

View File

@ -1136,8 +1136,6 @@ int32_t shellExecute() {
taosSetSignal(SIGTERM, shellQueryInterruptHandler);
taosSetSignal(SIGHUP, shellQueryInterruptHandler);
taosSetSignal(SIGABRT, shellQueryInterruptHandler);
taosSetSignal(SIGINT, shellQueryInterruptHandler);
#ifdef WEBSOCKET

View File

@ -19,6 +19,29 @@
SShellObj shell = {0};
void shellCrashHandler(int signum, void *sigInfo, void *context) {
taosIgnSignal(SIGTERM);
taosIgnSignal(SIGHUP);
taosIgnSignal(SIGINT);
taosIgnSignal(SIGBREAK);
#if !defined(WINDOWS)
taosIgnSignal(SIGBUS);
#endif
taosIgnSignal(SIGABRT);
taosIgnSignal(SIGFPE);
taosIgnSignal(SIGSEGV);
tscWriteCrashInfo(signum, sigInfo, context);
#ifdef _TD_DARWIN_64
exit(signum);
#elif defined(WINDOWS)
exit(signum);
#endif
}
int main(int argc, char *argv[]) {
shell.exit = false;
#ifdef WEBSOCKET
@ -26,6 +49,13 @@ int main(int argc, char *argv[]) {
shell.args.cloud = true;
#endif
#if !defined(WINDOWS)
taosSetSignal(SIGBUS, shellCrashHandler);
#endif
taosSetSignal(SIGABRT, shellCrashHandler);
taosSetSignal(SIGFPE, shellCrashHandler);
taosSetSignal(SIGSEGV, shellCrashHandler);
if (shellCheckIntSize() != 0) {
return -1;
}