Merge branch '3.0' into cpwu/3.0
This commit is contained in:
commit
c4fcfc0590
|
@ -50,6 +50,10 @@ pysim/
|
||||||
tests/script/api/batchprepare
|
tests/script/api/batchprepare
|
||||||
taosadapter
|
taosadapter
|
||||||
taosadapter-debug
|
taosadapter-debug
|
||||||
|
tools/taos-tools/*
|
||||||
|
tools/taosws-rs/*
|
||||||
|
tools/taosadapter/*
|
||||||
|
tools/upx*
|
||||||
|
|
||||||
# Doxygen Generated files
|
# Doxygen Generated files
|
||||||
html/
|
html/
|
||||||
|
|
|
@ -146,6 +146,7 @@ struct SConfig *taosGetCfg();
|
||||||
void taosSetAllDebugFlag(int32_t flag);
|
void taosSetAllDebugFlag(int32_t flag);
|
||||||
void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal);
|
void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal);
|
||||||
int32_t taosSetCfg(SConfig *pCfg, char *name);
|
int32_t taosSetCfg(SConfig *pCfg, char *name);
|
||||||
|
void taosLocalCfgForbiddenToChange(char* name, bool* forbidden);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -418,8 +418,6 @@ void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal);
|
||||||
|
|
||||||
char* nodesGetFillModeString(EFillMode mode);
|
char* nodesGetFillModeString(EFillMode mode);
|
||||||
int32_t nodesMergeConds(SNode** pDst, SNodeList** pSrc);
|
int32_t nodesMergeConds(SNode** pDst, SNodeList** pSrc);
|
||||||
int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** pTagIndexCond, SNode** pTagCond,
|
|
||||||
SNode** pOtherCond);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,10 @@ extern int32_t filterFreeNcharColumns(SFilterInfo *pFilterInfo);
|
||||||
extern void filterFreeInfo(SFilterInfo *info);
|
extern void filterFreeInfo(SFilterInfo *info);
|
||||||
extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows);
|
extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows);
|
||||||
|
|
||||||
|
/* condition split interface */
|
||||||
|
int32_t filterPartitionCond(SNode **pCondition, SNode **pPrimaryKeyCond, SNode **pTagIndexCond, SNode **pTagCond,
|
||||||
|
SNode **pOtherCond);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -25,7 +25,7 @@ extern "C" {
|
||||||
|
|
||||||
typedef struct SFilterInfo SFilterInfo;
|
typedef struct SFilterInfo SFilterInfo;
|
||||||
|
|
||||||
int32_t scalarGetOperatorResultType(SOperatorNode* pOp);
|
int32_t scalarGetOperatorResultType(SOperatorNode *pOp);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
pNode will be freed in API;
|
pNode will be freed in API;
|
||||||
|
@ -43,7 +43,7 @@ int32_t scalarGetOperatorParamNum(EOperatorType type);
|
||||||
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type);
|
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type);
|
||||||
|
|
||||||
int32_t vectorGetConvertType(int32_t type1, int32_t type2);
|
int32_t vectorGetConvertType(int32_t type1, int32_t type2);
|
||||||
int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut, int32_t* overflow);
|
int32_t vectorConvertImpl(const SScalarParam *pIn, SScalarParam *pOut, int32_t *overflow);
|
||||||
|
|
||||||
/* Math functions */
|
/* Math functions */
|
||||||
int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
@ -86,7 +86,7 @@ int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
|
||||||
int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t timezoneFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t timezoneFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
|
||||||
bool getTimePseudoFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getTimePseudoFuncEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
|
||||||
|
|
||||||
int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
|
|
@ -62,6 +62,8 @@ typedef int32_t TdUcs4;
|
||||||
int32_t taosUcs4len(TdUcs4 *ucs4);
|
int32_t taosUcs4len(TdUcs4 *ucs4);
|
||||||
int64_t taosStr2int64(const char *str);
|
int64_t taosStr2int64(const char *str);
|
||||||
|
|
||||||
|
void taosConvInit(void);
|
||||||
|
void taosConvDestroy();
|
||||||
int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs);
|
int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs);
|
||||||
bool taosMbsToUcs4(const char *mbs, size_t mbs_len, TdUcs4 *ucs4, int32_t ucs4_max_len, int32_t *len);
|
bool taosMbsToUcs4(const char *mbs, size_t mbs_len, TdUcs4 *ucs4, int32_t ucs4_max_len, int32_t *len);
|
||||||
int32_t tasoUcs4Compare(TdUcs4 *f1_ucs4, TdUcs4 *f2_ucs4, int32_t bytes);
|
int32_t tasoUcs4Compare(TdUcs4 *f1_ucs4, TdUcs4 *f2_ucs4, int32_t bytes);
|
||||||
|
|
|
@ -361,6 +361,8 @@ void taos_init_imp(void) {
|
||||||
|
|
||||||
initQueryModuleMsgHandle();
|
initQueryModuleMsgHandle();
|
||||||
|
|
||||||
|
taosConvInit();
|
||||||
|
|
||||||
rpcInit();
|
rpcInit();
|
||||||
|
|
||||||
SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
|
SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
|
||||||
|
|
|
@ -353,6 +353,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
|
||||||
desc.subDesc = NULL;
|
desc.subDesc = NULL;
|
||||||
desc.subPlanNum = 0;
|
desc.subPlanNum = 0;
|
||||||
}
|
}
|
||||||
|
desc.subPlanNum = taosArrayGetSize(desc.subDesc);
|
||||||
ASSERT(desc.subPlanNum == taosArrayGetSize(desc.subDesc));
|
ASSERT(desc.subPlanNum == taosArrayGetSize(desc.subDesc));
|
||||||
} else {
|
} else {
|
||||||
desc.subDesc = NULL;
|
desc.subDesc = NULL;
|
||||||
|
|
|
@ -75,6 +75,8 @@ void taos_cleanup(void) {
|
||||||
|
|
||||||
cleanupTaskQueue();
|
cleanupTaskQueue();
|
||||||
|
|
||||||
|
taosConvDestroy();
|
||||||
|
|
||||||
tscInfo("all local resources released");
|
tscInfo("all local resources released");
|
||||||
taosCleanupCfg();
|
taosCleanupCfg();
|
||||||
taosCloseLog();
|
taosCloseLog();
|
||||||
|
|
|
@ -49,7 +49,7 @@ int32_t tsNumOfShmThreads = 1;
|
||||||
// queue & threads
|
// queue & threads
|
||||||
int32_t tsNumOfRpcThreads = 1;
|
int32_t tsNumOfRpcThreads = 1;
|
||||||
int32_t tsNumOfCommitThreads = 2;
|
int32_t tsNumOfCommitThreads = 2;
|
||||||
int32_t tsNumOfTaskQueueThreads = 1;
|
int32_t tsNumOfTaskQueueThreads = 4;
|
||||||
int32_t tsNumOfMnodeQueryThreads = 4;
|
int32_t tsNumOfMnodeQueryThreads = 4;
|
||||||
int32_t tsNumOfMnodeFetchThreads = 1;
|
int32_t tsNumOfMnodeFetchThreads = 1;
|
||||||
int32_t tsNumOfMnodeReadThreads = 1;
|
int32_t tsNumOfMnodeReadThreads = 1;
|
||||||
|
@ -317,9 +317,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
||||||
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1;
|
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1;
|
||||||
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
|
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
|
||||||
|
|
||||||
tsNumOfTaskQueueThreads = tsNumOfCores / 4;
|
tsNumOfTaskQueueThreads = tsNumOfCores / 2;
|
||||||
tsNumOfTaskQueueThreads = TRANGE(tsNumOfTaskQueueThreads, 1, 2);
|
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
|
||||||
if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 1, 1024, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 4, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -594,6 +594,20 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void taosLocalCfgForbiddenToChange(char* name, bool* forbidden) {
|
||||||
|
int32_t len = strlen(name);
|
||||||
|
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
|
||||||
|
strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len));
|
||||||
|
|
||||||
|
if (strcasecmp("charset", name) == 0) {
|
||||||
|
*forbidden = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
*forbidden = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t taosSetCfg(SConfig *pCfg, char *name) {
|
int32_t taosSetCfg(SConfig *pCfg, char *name) {
|
||||||
int32_t len = strlen(name);
|
int32_t len = strlen(name);
|
||||||
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
|
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
|
||||||
|
|
|
@ -219,6 +219,8 @@ int mainWindows(int argc,char** argv) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosConvInit();
|
||||||
|
|
||||||
if (global.dumpConfig) {
|
if (global.dumpConfig) {
|
||||||
dmDumpCfg();
|
dmDumpCfg();
|
||||||
taosCleanupCfg();
|
taosCleanupCfg();
|
||||||
|
|
|
@ -215,6 +215,7 @@ void dmCleanupDnode(SDnode *pDnode) {
|
||||||
dmClearVars(pDnode);
|
dmClearVars(pDnode);
|
||||||
rpcCleanup();
|
rpcCleanup();
|
||||||
indexCleanup();
|
indexCleanup();
|
||||||
|
taosConvDestroy();
|
||||||
dDebug("dnode is closed, ptr:%p", pDnode);
|
dDebug("dnode is closed, ptr:%p", pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -90,6 +90,9 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2);
|
||||||
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
SColVal *tRowIterNext(SRowIter *pIter);
|
SColVal *tRowIterNext(SRowIter *pIter);
|
||||||
// SRowMerger
|
// SRowMerger
|
||||||
|
int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
|
int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
|
|
||||||
int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
|
int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
void tRowMergerClear(SRowMerger *pMerger);
|
void tRowMergerClear(SRowMerger *pMerger);
|
||||||
int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow);
|
int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow);
|
||||||
|
|
|
@ -311,6 +311,8 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
||||||
|
|
||||||
tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
|
tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
|
||||||
|
|
||||||
|
tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
|
||||||
|
|
||||||
wSet.diskId = did;
|
wSet.diskId = did;
|
||||||
wSet.fid = pCommitter->commitFid;
|
wSet.fid = pCommitter->commitFid;
|
||||||
fHead = (SHeadFile){.commitID = pCommitter->commitID, .offset = 0, .size = 0};
|
fHead = (SHeadFile){.commitID = pCommitter->commitID, .offset = 0, .size = 0};
|
||||||
|
|
|
@ -143,7 +143,7 @@ static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i
|
||||||
static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
|
static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
|
||||||
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
|
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
|
||||||
SRowMerger* pMerger);
|
SRowMerger* pMerger);
|
||||||
static int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
|
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
|
||||||
STsdbReader* pReader);
|
STsdbReader* pReader);
|
||||||
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
|
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
|
||||||
static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex);
|
static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex);
|
||||||
|
@ -212,8 +212,8 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
|
||||||
pTsdbReader->idStr);
|
pTsdbReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables, (sizeof(STableBlockScanInfo)*numOfTables)/1024.0,
|
tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables,
|
||||||
pTsdbReader->idStr);
|
(sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->idStr);
|
||||||
|
|
||||||
return pTableMap;
|
return pTableMap;
|
||||||
}
|
}
|
||||||
|
@ -397,7 +397,8 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity)
|
||||||
return pResBlock;
|
return pResBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity, const char* idstr) {
|
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
|
||||||
|
const char* idstr) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int8_t level = 0;
|
int8_t level = 0;
|
||||||
STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
|
STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
|
||||||
|
@ -577,7 +578,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
|
||||||
|
|
||||||
int64_t et2 = taosGetTimestampUs();
|
int64_t et2 = taosGetTimestampUs();
|
||||||
tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
|
tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
|
||||||
(int32_t)num, (et1 - st)/1000.0, (et2-et1)/1000.0, num * sizeof(SBlockIdx)/1024.0, pReader->idStr);
|
(int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
|
||||||
|
|
||||||
pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;
|
pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;
|
||||||
|
|
||||||
|
@ -642,9 +643,9 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
double el = (taosGetTimestampUs() - st)/1000.0;
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, size:%.2f Kb, elapsed time:%.2f ms %s",
|
tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, size:%.2f Kb, elapsed time:%.2f ms %s",
|
||||||
numOfTables, *numOfBlocks, *numOfValidTables, size/1000.0, el, pReader->idStr);
|
numOfTables, *numOfBlocks, *numOfValidTables, size / 1000.0, el, pReader->idStr);
|
||||||
|
|
||||||
pReader->cost.numOfBlocks += (*numOfBlocks);
|
pReader->cost.numOfBlocks += (*numOfBlocks);
|
||||||
pReader->cost.headFileLoadTime += el;
|
pReader->cost.headFileLoadTime += el;
|
||||||
|
@ -680,9 +681,7 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
|
||||||
return pFBlockInfo;
|
return pFBlockInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) {
|
static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
|
||||||
return &pBlockIter->block;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
|
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
@ -781,8 +780,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
SBlockIdx blockIdx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
|
SBlockIdx blockIdx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
|
||||||
int32_t code = tsdbReadColData(pReader->pFileReader, &blockIdx, pBlock, pSupInfo->colIds, numOfCols,
|
int32_t code =
|
||||||
pBlockData, NULL, NULL);
|
tsdbReadColData(pReader->pFileReader, &blockIdx, pBlock, pSupInfo->colIds, numOfCols, pBlockData, NULL, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -937,8 +936,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t et = taosGetTimestampUs();
|
int64_t et = taosGetTimestampUs();
|
||||||
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s", pReader, cnt,
|
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
|
||||||
(et - st)/1000.0, pReader->idStr);
|
pReader, cnt, (et - st) / 1000.0, pReader->idStr);
|
||||||
|
|
||||||
pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
|
pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
|
||||||
cleanupBlockOrderSupporter(&sup);
|
cleanupBlockOrderSupporter(&sup);
|
||||||
|
@ -976,7 +975,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t et = taosGetTimestampUs();
|
int64_t et = taosGetTimestampUs();
|
||||||
tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, cnt, (et-st)/1000.0, pReader->idStr);
|
tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, cnt, (et - st) / 1000.0,
|
||||||
|
pReader->idStr);
|
||||||
cleanupBlockOrderSupporter(&sup);
|
cleanupBlockOrderSupporter(&sup);
|
||||||
taosMemoryFree(pTree);
|
taosMemoryFree(pTree);
|
||||||
|
|
||||||
|
@ -1024,7 +1024,7 @@ static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STab
|
||||||
int32_t step = asc ? 1 : -1;
|
int32_t step = asc ? 1 : -1;
|
||||||
*nextIndex = pFBlockInfo->tbBlockIdx + step;
|
*nextIndex = pFBlockInfo->tbBlockIdx + step;
|
||||||
|
|
||||||
SBlock *pBlock = taosMemoryCalloc(1, sizeof(SBlock));
|
SBlock* pBlock = taosMemoryCalloc(1, sizeof(SBlock));
|
||||||
int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
|
int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
|
||||||
|
|
||||||
tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetBlock);
|
tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetBlock);
|
||||||
|
@ -1196,10 +1196,10 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
setComposedBlockFlag(pReader, true);
|
setComposedBlockFlag(pReader, true);
|
||||||
|
|
||||||
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
|
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
tsdbDebug(
|
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange: %" PRId64
|
||||||
"%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange: %" PRId64
|
|
||||||
" - %" PRId64 " %s",
|
" - %" PRId64 " %s",
|
||||||
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pReader->idStr);
|
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
|
||||||
|
pReader->idStr);
|
||||||
|
|
||||||
pReader->cost.buildmemBlock += elapsedTime;
|
pReader->cost.buildmemBlock += elapsedTime;
|
||||||
return code;
|
return code;
|
||||||
|
@ -1230,7 +1230,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||||
|
|
||||||
tRowMerge(&merge, pRow);
|
tRowMerge(&merge, pRow);
|
||||||
doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
|
|
||||||
tRowMergerGetRow(&merge, &pTSRow);
|
tRowMergerGetRow(&merge, &pTSRow);
|
||||||
}
|
}
|
||||||
|
@ -1246,7 +1246,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
updateSchema(pRow, pBlockScanInfo->uid, pReader);
|
updateSchema(pRow, pBlockScanInfo->uid, pReader);
|
||||||
|
|
||||||
tRowMergerInit(&merge, pRow, pReader->pSchema);
|
tRowMergerInit(&merge, pRow, pReader->pSchema);
|
||||||
doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
|
|
||||||
tRowMerge(&merge, &fRow);
|
tRowMerge(&merge, &fRow);
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||||
|
@ -1291,12 +1291,12 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
|
|
||||||
if (ik.ts == key) {
|
if (ik.ts == key) {
|
||||||
tRowMerge(&merge, piRow);
|
tRowMerge(&merge, piRow);
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (k.ts == key) {
|
if (k.ts == key) {
|
||||||
tRowMerge(&merge, pRow);
|
tRowMerge(&merge, pRow);
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
tRowMergerGetRow(&merge, &pTSRow);
|
tRowMergerGetRow(&merge, &pTSRow);
|
||||||
|
@ -1336,11 +1336,11 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
updateSchema(pRow, uid, pReader);
|
updateSchema(pRow, uid, pReader);
|
||||||
|
|
||||||
tRowMergerInit(&merge, pRow, pReader->pSchema);
|
tRowMergerInit(&merge, pRow, pReader->pSchema);
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
|
|
||||||
if (ik.ts == k.ts) {
|
if (ik.ts == k.ts) {
|
||||||
tRowMerge(&merge, piRow);
|
tRowMerge(&merge, piRow);
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (k.ts == key) {
|
if (k.ts == key) {
|
||||||
|
@ -1514,9 +1514,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
setComposedBlockFlag(pReader, true);
|
setComposedBlockFlag(pReader, true);
|
||||||
int64_t et = taosGetTimestampUs();
|
int64_t et = taosGetTimestampUs();
|
||||||
|
|
||||||
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, elapsed time:%.2f ms %s", pReader,
|
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
|
||||||
pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows,
|
" rows:%d, elapsed time:%.2f ms %s",
|
||||||
(et - st)/1000.0, pReader->idStr);
|
pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
||||||
|
pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2111,7 +2112,8 @@ TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pRea
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader) {
|
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
|
||||||
|
STsdbReader* pReader) {
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
|
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
|
||||||
if (!pIter->hasVal) {
|
if (!pIter->hasVal) {
|
||||||
|
@ -2130,7 +2132,19 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMer
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
tRowMerge(pMerger, pRow);
|
int32_t sversion = TSDBROW_SVERSION(pRow);
|
||||||
|
STSchema* pTSchema = NULL;
|
||||||
|
if (sversion != pReader->pSchema->version) {
|
||||||
|
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema);
|
||||||
|
} else {
|
||||||
|
pTSchema = pReader->pSchema;
|
||||||
|
}
|
||||||
|
|
||||||
|
tRowMergerAdd(pMerger, pRow, pTSchema);
|
||||||
|
|
||||||
|
if (sversion != pReader->pSchema->version) {
|
||||||
|
taosMemoryFree(pTSchema);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2254,12 +2268,23 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe
|
||||||
SRowMerger merge = {0};
|
SRowMerger merge = {0};
|
||||||
|
|
||||||
TSDBKEY k = TSDBROW_KEY(pRow);
|
TSDBKEY k = TSDBROW_KEY(pRow);
|
||||||
updateSchema(pRow, uid, pReader);
|
// updateSchema(pRow, uid, pReader);
|
||||||
|
int32_t sversion = TSDBROW_SVERSION(pRow);
|
||||||
|
STSchema* pTSchema = NULL;
|
||||||
|
if (sversion != pReader->pSchema->version) {
|
||||||
|
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema);
|
||||||
|
} else {
|
||||||
|
pTSchema = pReader->pSchema;
|
||||||
|
}
|
||||||
|
|
||||||
tRowMergerInit(&merge, pRow, pReader->pSchema);
|
tRowMergerInit2(&merge, pReader->pSchema, pRow, pTSchema);
|
||||||
doMergeRowsInBuf(pIter, k.ts, pDelList, &merge, pReader);
|
doMergeRowsInBuf(pIter, uid, k.ts, pDelList, &merge, pReader);
|
||||||
tRowMergerGetRow(&merge, pTSRow);
|
tRowMergerGetRow(&merge, pTSRow);
|
||||||
tRowMergerClear(&merge);
|
tRowMergerClear(&merge);
|
||||||
|
|
||||||
|
if (sversion != pReader->pSchema->version) {
|
||||||
|
taosMemoryFree(pTSchema);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
|
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
|
||||||
|
@ -2273,18 +2298,18 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo
|
||||||
updateSchema(piRow, pBlockScanInfo->uid, pReader);
|
updateSchema(piRow, pBlockScanInfo->uid, pReader);
|
||||||
|
|
||||||
tRowMergerInit(&merge, piRow, pReader->pSchema);
|
tRowMergerInit(&merge, piRow, pReader->pSchema);
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iiter, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
|
|
||||||
tRowMerge(&merge, pRow);
|
tRowMerge(&merge, pRow);
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
} else {
|
} else {
|
||||||
updateSchema(pRow, pBlockScanInfo->uid, pReader);
|
updateSchema(pRow, pBlockScanInfo->uid, pReader);
|
||||||
|
|
||||||
tRowMergerInit(&merge, pRow, pReader->pSchema);
|
tRowMergerInit(&merge, pRow, pReader->pSchema);
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
|
|
||||||
tRowMerge(&merge, piRow);
|
tRowMerge(&merge, piRow);
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iiter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
tRowMergerGetRow(&merge, pTSRow);
|
tRowMergerGetRow(&merge, pTSRow);
|
||||||
|
@ -2592,7 +2617,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
||||||
STsdbReader* pPrevReader = pReader->innerReader[0];
|
STsdbReader* pPrevReader = pReader->innerReader[0];
|
||||||
SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;
|
SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;
|
||||||
|
|
||||||
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order, pPrevReader->idStr);
|
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order,
|
||||||
|
pPrevReader->idStr);
|
||||||
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap);
|
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap);
|
||||||
|
|
||||||
// no data in files, let's try buffer in memory
|
// no data in files, let's try buffer in memory
|
||||||
|
@ -2647,12 +2673,14 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
|
|
||||||
SIOCostSummary* pCost = &pReader->cost;
|
SIOCostSummary* pCost = &pReader->cost;
|
||||||
|
|
||||||
tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%"PRId64" SMA-time:%.2f ms, "
|
tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
|
||||||
"fileBlocks:%"PRId64", fileBlocks-time:%.2f ms, build in-memory-block-time:%.2f ms, STableBlockScanInfo "
|
" SMA-time:%.2f ms, "
|
||||||
|
"fileBlocks:%" PRId64
|
||||||
|
", fileBlocks-time:%.2f ms, build in-memory-block-time:%.2f ms, STableBlockScanInfo "
|
||||||
"size:%.2f Kb %s",
|
"size:%.2f Kb %s",
|
||||||
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime,
|
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime,
|
||||||
pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock,
|
pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock,
|
||||||
numOfTables * sizeof(STableBlockScanInfo) /1000.0, pReader->idStr);
|
numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr);
|
||||||
|
|
||||||
taosMemoryFree(pReader->idStr);
|
taosMemoryFree(pReader->idStr);
|
||||||
taosMemoryFree(pReader->pSchema);
|
taosMemoryFree(pReader->pSchema);
|
||||||
|
@ -2747,7 +2775,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
*allHave = false;
|
*allHave = false;
|
||||||
|
|
||||||
if(pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
||||||
*pBlockStatis = NULL;
|
*pBlockStatis = NULL;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2891,8 +2919,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
} else {
|
} else {
|
||||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
|
tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
|
||||||
pReader, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
|
numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "tdataformat.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
// SMapData =======================================================================
|
// SMapData =======================================================================
|
||||||
|
@ -548,6 +549,103 @@ SColVal *tRowIterNext(SRowIter *pIter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SRowMerger ======================================================
|
// SRowMerger ======================================================
|
||||||
|
|
||||||
|
int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema) {
|
||||||
|
int32_t code = 0;
|
||||||
|
TSDBKEY key = TSDBROW_KEY(pRow);
|
||||||
|
SColVal *pColVal = &(SColVal){0};
|
||||||
|
STColumn *pTColumn;
|
||||||
|
int32_t iCol, jCol = 0;
|
||||||
|
|
||||||
|
pMerger->pTSchema = pResTSchema;
|
||||||
|
pMerger->version = key.version;
|
||||||
|
|
||||||
|
pMerger->pArray = taosArrayInit(pResTSchema->numOfCols, sizeof(SColVal));
|
||||||
|
if (pMerger->pArray == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ts
|
||||||
|
pTColumn = &pTSchema->columns[jCol++];
|
||||||
|
|
||||||
|
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||||
|
|
||||||
|
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts});
|
||||||
|
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// other
|
||||||
|
for (iCol = 1; jCol < pTSchema->numOfCols && iCol < pResTSchema->numOfCols; ++iCol) {
|
||||||
|
pTColumn = &pResTSchema->columns[iCol];
|
||||||
|
if (pTSchema->columns[jCol].colId < pTColumn->colId) {
|
||||||
|
++jCol;
|
||||||
|
--iCol;
|
||||||
|
continue;
|
||||||
|
} else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
|
||||||
|
taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
|
||||||
|
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (; iCol < pResTSchema->numOfCols; ++iCol) {
|
||||||
|
pTColumn = &pResTSchema->columns[iCol];
|
||||||
|
taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type));
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
|
||||||
|
int32_t code = 0;
|
||||||
|
TSDBKEY key = TSDBROW_KEY(pRow);
|
||||||
|
SColVal *pColVal = &(SColVal){0};
|
||||||
|
STColumn *pTColumn;
|
||||||
|
int32_t iCol, jCol = 1;
|
||||||
|
|
||||||
|
ASSERT(((SColVal *)pMerger->pArray->pData)->value.ts == key.ts);
|
||||||
|
|
||||||
|
for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) {
|
||||||
|
pTColumn = &pMerger->pTSchema->columns[iCol];
|
||||||
|
if (pTSchema->columns[jCol].colId < pTColumn->colId) {
|
||||||
|
++jCol;
|
||||||
|
--iCol;
|
||||||
|
continue;
|
||||||
|
} else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
|
||||||
|
|
||||||
|
if (key.version > pMerger->version) {
|
||||||
|
if (!pColVal->isNone) {
|
||||||
|
taosArraySet(pMerger->pArray, iCol, pColVal);
|
||||||
|
}
|
||||||
|
} else if (key.version < pMerger->version) {
|
||||||
|
SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
|
||||||
|
if (tColVal->isNone && !pColVal->isNone) {
|
||||||
|
taosArraySet(pMerger->pArray, iCol, pColVal);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pMerger->version = key.version;
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
|
int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
TSDBKEY key = TSDBROW_KEY(pRow);
|
TSDBKEY key = TSDBROW_KEY(pRow);
|
||||||
|
|
|
@ -517,6 +517,13 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) {
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool forbidden = false;
|
||||||
|
taosLocalCfgForbiddenToChange(pStmt->config, &forbidden);
|
||||||
|
if (forbidden) {
|
||||||
|
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
if (cfgSetItem(tsCfg, pStmt->config, pStmt->value, CFG_STYPE_ALTER_CMD)) {
|
if (cfgSetItem(tsCfg, pStmt->config, pStmt->value, CFG_STYPE_ALTER_CMD)) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2684,6 +2684,7 @@ static int32_t jsonToDataType(const SJson* pJson, void* pObj) {
|
||||||
|
|
||||||
static const char* jkExprDataType = "DataType";
|
static const char* jkExprDataType = "DataType";
|
||||||
static const char* jkExprAliasName = "AliasName";
|
static const char* jkExprAliasName = "AliasName";
|
||||||
|
static const char* jkExprUserAlias = "UserAlias";
|
||||||
|
|
||||||
static int32_t exprNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t exprNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SExprNode* pNode = (const SExprNode*)pObj;
|
const SExprNode* pNode = (const SExprNode*)pObj;
|
||||||
|
@ -2692,6 +2693,9 @@ static int32_t exprNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddStringToObject(pJson, jkExprAliasName, pNode->aliasName);
|
code = tjsonAddStringToObject(pJson, jkExprAliasName, pNode->aliasName);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddStringToObject(pJson, jkExprUserAlias, pNode->userAlias);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2703,6 +2707,9 @@ static int32_t jsonToExprNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetStringValue(pJson, jkExprAliasName, pNode->aliasName);
|
code = tjsonGetStringValue(pJson, jkExprAliasName, pNode->aliasName);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetStringValue(pJson, jkExprUserAlias, pNode->userAlias);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1502,7 +1502,7 @@ static EDealRes collectColumns(SNode* pNode, void* pContext) {
|
||||||
SCollectColumnsCxt* pCxt = (SCollectColumnsCxt*)pContext;
|
SCollectColumnsCxt* pCxt = (SCollectColumnsCxt*)pContext;
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||||
if (isCollectType(pCxt->collectType, pCol->colType) &&
|
if (isCollectType(pCxt->collectType, pCol->colType) && 0 != strcmp(pCol->colName, "*") &&
|
||||||
(NULL == pCxt->pTableAlias || 0 == strcmp(pCxt->pTableAlias, pCol->tableAlias))) {
|
(NULL == pCxt->pTableAlias || 0 == strcmp(pCxt->pTableAlias, pCol->tableAlias))) {
|
||||||
return doCollect(pCxt, pCol, pNode);
|
return doCollect(pCxt, pCol, pNode);
|
||||||
}
|
}
|
||||||
|
@ -1816,187 +1816,3 @@ int32_t nodesMergeConds(SNode** pDst, SNodeList** pSrc) {
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SClassifyConditionCxt {
|
|
||||||
bool hasPrimaryKey;
|
|
||||||
bool hasTagIndexCol;
|
|
||||||
bool hasTagCol;
|
|
||||||
bool hasOtherCol;
|
|
||||||
} SClassifyConditionCxt;
|
|
||||||
|
|
||||||
static EDealRes classifyConditionImpl(SNode* pNode, void* pContext) {
|
|
||||||
SClassifyConditionCxt* pCxt = (SClassifyConditionCxt*)pContext;
|
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
|
||||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
|
||||||
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && TSDB_SYSTEM_TABLE != pCol->tableType) {
|
|
||||||
pCxt->hasPrimaryKey = true;
|
|
||||||
} else if (pCol->hasIndex) {
|
|
||||||
pCxt->hasTagIndexCol = true;
|
|
||||||
pCxt->hasTagCol = true;
|
|
||||||
} else if (COLUMN_TYPE_TAG == pCol->colType || COLUMN_TYPE_TBNAME == pCol->colType) {
|
|
||||||
pCxt->hasTagCol = true;
|
|
||||||
} else {
|
|
||||||
pCxt->hasOtherCol = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return DEAL_RES_CONTINUE;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef enum EConditionType {
|
|
||||||
COND_TYPE_PRIMARY_KEY = 1,
|
|
||||||
COND_TYPE_TAG_INDEX,
|
|
||||||
COND_TYPE_TAG,
|
|
||||||
COND_TYPE_NORMAL
|
|
||||||
} EConditionType;
|
|
||||||
|
|
||||||
static EConditionType classifyCondition(SNode* pNode) {
|
|
||||||
SClassifyConditionCxt cxt = {.hasPrimaryKey = false, .hasTagIndexCol = false, .hasOtherCol = false};
|
|
||||||
nodesWalkExpr(pNode, classifyConditionImpl, &cxt);
|
|
||||||
return cxt.hasOtherCol ? COND_TYPE_NORMAL
|
|
||||||
: (cxt.hasPrimaryKey && cxt.hasTagCol
|
|
||||||
? COND_TYPE_NORMAL
|
|
||||||
: (cxt.hasPrimaryKey ? COND_TYPE_PRIMARY_KEY
|
|
||||||
: (cxt.hasTagIndexCol ? COND_TYPE_TAG_INDEX : COND_TYPE_TAG)));
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t partitionLogicCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** pTagIndexCond, SNode** pTagCond,
|
|
||||||
SNode** pOtherCond) {
|
|
||||||
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(*pCondition);
|
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
|
|
||||||
SNodeList* pPrimaryKeyConds = NULL;
|
|
||||||
SNodeList* pTagIndexConds = NULL;
|
|
||||||
SNodeList* pTagConds = NULL;
|
|
||||||
SNodeList* pOtherConds = NULL;
|
|
||||||
SNode* pCond = NULL;
|
|
||||||
FOREACH(pCond, pLogicCond->pParameterList) {
|
|
||||||
switch (classifyCondition(pCond)) {
|
|
||||||
case COND_TYPE_PRIMARY_KEY:
|
|
||||||
if (NULL != pPrimaryKeyCond) {
|
|
||||||
code = nodesListMakeAppend(&pPrimaryKeyConds, nodesCloneNode(pCond));
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case COND_TYPE_TAG_INDEX:
|
|
||||||
if (NULL != pTagIndexCond) {
|
|
||||||
code = nodesListMakeAppend(&pTagIndexConds, nodesCloneNode(pCond));
|
|
||||||
}
|
|
||||||
if (NULL != pTagCond) {
|
|
||||||
code = nodesListMakeAppend(&pTagConds, nodesCloneNode(pCond));
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case COND_TYPE_TAG:
|
|
||||||
if (NULL != pTagCond) {
|
|
||||||
code = nodesListMakeAppend(&pTagConds, nodesCloneNode(pCond));
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case COND_TYPE_NORMAL:
|
|
||||||
default:
|
|
||||||
if (NULL != pOtherCond) {
|
|
||||||
code = nodesListMakeAppend(&pOtherConds, nodesCloneNode(pCond));
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SNode* pTempPrimaryKeyCond = NULL;
|
|
||||||
SNode* pTempTagIndexCond = NULL;
|
|
||||||
SNode* pTempTagCond = NULL;
|
|
||||||
SNode* pTempOtherCond = NULL;
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = nodesMergeConds(&pTempPrimaryKeyCond, &pPrimaryKeyConds);
|
|
||||||
}
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = nodesMergeConds(&pTempTagIndexCond, &pTagIndexConds);
|
|
||||||
}
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = nodesMergeConds(&pTempTagCond, &pTagConds);
|
|
||||||
}
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = nodesMergeConds(&pTempOtherCond, &pOtherConds);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
if (NULL != pPrimaryKeyCond) {
|
|
||||||
*pPrimaryKeyCond = pTempPrimaryKeyCond;
|
|
||||||
}
|
|
||||||
if (NULL != pTagIndexCond) {
|
|
||||||
*pTagIndexCond = pTempTagIndexCond;
|
|
||||||
}
|
|
||||||
if (NULL != pTagCond) {
|
|
||||||
*pTagCond = pTempTagCond;
|
|
||||||
}
|
|
||||||
if (NULL != pOtherCond) {
|
|
||||||
*pOtherCond = pTempOtherCond;
|
|
||||||
}
|
|
||||||
nodesDestroyNode(*pCondition);
|
|
||||||
*pCondition = NULL;
|
|
||||||
} else {
|
|
||||||
nodesDestroyList(pPrimaryKeyConds);
|
|
||||||
nodesDestroyList(pTagIndexConds);
|
|
||||||
nodesDestroyList(pTagConds);
|
|
||||||
nodesDestroyList(pOtherConds);
|
|
||||||
nodesDestroyNode(pTempPrimaryKeyCond);
|
|
||||||
nodesDestroyNode(pTempTagIndexCond);
|
|
||||||
nodesDestroyNode(pTempTagCond);
|
|
||||||
nodesDestroyNode(pTempOtherCond);
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** pTagIndexCond, SNode** pTagCond,
|
|
||||||
SNode** pOtherCond) {
|
|
||||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pCondition) &&
|
|
||||||
LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)*pCondition)->condType) {
|
|
||||||
return partitionLogicCond(pCondition, pPrimaryKeyCond, pTagIndexCond, pTagCond, pOtherCond);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool needOutput = false;
|
|
||||||
switch (classifyCondition(*pCondition)) {
|
|
||||||
case COND_TYPE_PRIMARY_KEY:
|
|
||||||
if (NULL != pPrimaryKeyCond) {
|
|
||||||
*pPrimaryKeyCond = *pCondition;
|
|
||||||
needOutput = true;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case COND_TYPE_TAG_INDEX:
|
|
||||||
if (NULL != pTagIndexCond) {
|
|
||||||
*pTagIndexCond = *pCondition;
|
|
||||||
needOutput = true;
|
|
||||||
}
|
|
||||||
if (NULL != pTagCond) {
|
|
||||||
SNode* pTempCond = *pCondition;
|
|
||||||
if (NULL != pTagIndexCond) {
|
|
||||||
pTempCond = nodesCloneNode(*pCondition);
|
|
||||||
if (NULL == pTempCond) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*pTagCond = pTempCond;
|
|
||||||
needOutput = true;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case COND_TYPE_TAG:
|
|
||||||
if (NULL != pTagCond) {
|
|
||||||
*pTagCond = *pCondition;
|
|
||||||
needOutput = true;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case COND_TYPE_NORMAL:
|
|
||||||
default:
|
|
||||||
if (NULL != pOtherCond) {
|
|
||||||
*pOtherCond = *pCondition;
|
|
||||||
needOutput = true;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (needOutput) {
|
|
||||||
*pCondition = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
|
@ -233,18 +233,22 @@ SNode* createRawExprNodeExt(SAstCreateContext* pCxt, const SToken* pStart, const
|
||||||
SNode* releaseRawExprNode(SAstCreateContext* pCxt, SNode* pNode) {
|
SNode* releaseRawExprNode(SAstCreateContext* pCxt, SNode* pNode) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
SRawExprNode* pRawExpr = (SRawExprNode*)pNode;
|
SRawExprNode* pRawExpr = (SRawExprNode*)pNode;
|
||||||
SNode* pExpr = pRawExpr->pNode;
|
SNode* pRealizedExpr = pRawExpr->pNode;
|
||||||
if (nodesIsExprNode(pExpr)) {
|
if (nodesIsExprNode(pRealizedExpr)) {
|
||||||
|
SExprNode* pExpr = (SExprNode*)pRealizedExpr;
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
|
if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
|
||||||
strcpy(((SExprNode*)pExpr)->aliasName, ((SColumnNode*)pExpr)->colName);
|
strcpy(pExpr->aliasName, ((SColumnNode*)pExpr)->colName);
|
||||||
|
strcpy(pExpr->userAlias, ((SColumnNode*)pExpr)->colName);
|
||||||
} else {
|
} else {
|
||||||
int32_t len = TMIN(sizeof(((SExprNode*)pExpr)->aliasName) - 1, pRawExpr->n);
|
int32_t len = TMIN(sizeof(pExpr->aliasName) - 1, pRawExpr->n);
|
||||||
strncpy(((SExprNode*)pExpr)->aliasName, pRawExpr->p, len);
|
strncpy(pExpr->aliasName, pRawExpr->p, len);
|
||||||
((SExprNode*)pExpr)->aliasName[len] = '\0';
|
pExpr->aliasName[len] = '\0';
|
||||||
|
strncpy(pExpr->userAlias, pRawExpr->p, len);
|
||||||
|
pExpr->userAlias[len] = '\0';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosMemoryFreeClear(pNode);
|
taosMemoryFreeClear(pNode);
|
||||||
return pExpr;
|
return pRealizedExpr;
|
||||||
}
|
}
|
||||||
|
|
||||||
SToken getTokenFromRawExprNode(SAstCreateContext* pCxt, SNode* pNode) {
|
SToken getTokenFromRawExprNode(SAstCreateContext* pCxt, SNode* pNode) {
|
||||||
|
@ -641,11 +645,12 @@ SNode* createInterpTimeRange(SAstCreateContext* pCxt, SNode* pStart, SNode* pEnd
|
||||||
SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, SToken* pAlias) {
|
SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, SToken* pAlias) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
trimEscape(pAlias);
|
trimEscape(pAlias);
|
||||||
int32_t len = TMIN(sizeof(((SExprNode*)pNode)->aliasName) - 1, pAlias->n);
|
SExprNode* pExpr = (SExprNode*)pNode;
|
||||||
strncpy(((SExprNode*)pNode)->aliasName, pAlias->z, len);
|
int32_t len = TMIN(sizeof(pExpr->aliasName) - 1, pAlias->n);
|
||||||
((SExprNode*)pNode)->aliasName[len] = '\0';
|
strncpy(pExpr->aliasName, pAlias->z, len);
|
||||||
strncpy(((SExprNode*)pNode)->userAlias, pAlias->z, len);
|
pExpr->aliasName[len] = '\0';
|
||||||
((SExprNode*)pNode)->userAlias[len] = '\0';
|
strncpy(pExpr->userAlias, pAlias->z, len);
|
||||||
|
pExpr->userAlias[len] = '\0';
|
||||||
return pNode;
|
return pNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -766,13 +771,21 @@ SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pPr
|
||||||
return (SNode*)select;
|
return (SNode*)select;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void setSubquery(SNode* pStmt) {
|
||||||
|
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
|
||||||
|
((SSelectStmt*)pStmt)->isSubquery = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SNode* createSetOperator(SAstCreateContext* pCxt, ESetOperatorType type, SNode* pLeft, SNode* pRight) {
|
SNode* createSetOperator(SAstCreateContext* pCxt, ESetOperatorType type, SNode* pLeft, SNode* pRight) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
SSetOperator* setOp = (SSetOperator*)nodesMakeNode(QUERY_NODE_SET_OPERATOR);
|
SSetOperator* setOp = (SSetOperator*)nodesMakeNode(QUERY_NODE_SET_OPERATOR);
|
||||||
CHECK_OUT_OF_MEM(setOp);
|
CHECK_OUT_OF_MEM(setOp);
|
||||||
setOp->opType = type;
|
setOp->opType = type;
|
||||||
setOp->pLeft = pLeft;
|
setOp->pLeft = pLeft;
|
||||||
|
setSubquery(setOp->pLeft);
|
||||||
setOp->pRight = pRight;
|
setOp->pRight = pRight;
|
||||||
|
setSubquery(setOp->pRight);
|
||||||
sprintf(setOp->stmtName, "%p", setOp);
|
sprintf(setOp->stmtName, "%p", setOp);
|
||||||
return (SNode*)setOp;
|
return (SNode*)setOp;
|
||||||
}
|
}
|
||||||
|
|
|
@ -523,6 +523,9 @@ static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* p
|
||||||
if ('\0' == pCol->node.aliasName[0]) {
|
if ('\0' == pCol->node.aliasName[0]) {
|
||||||
strcpy(pCol->node.aliasName, pColSchema->name);
|
strcpy(pCol->node.aliasName, pColSchema->name);
|
||||||
}
|
}
|
||||||
|
if ('\0' == pCol->node.userAlias[0]) {
|
||||||
|
strcpy(pCol->node.userAlias, pColSchema->name);
|
||||||
|
}
|
||||||
pCol->tableId = pTable->pMeta->uid;
|
pCol->tableId = pTable->pMeta->uid;
|
||||||
pCol->tableType = pTable->pMeta->tableType;
|
pCol->tableType = pTable->pMeta->tableType;
|
||||||
pCol->colId = pColSchema->colId;
|
pCol->colId = pColSchema->colId;
|
||||||
|
@ -549,6 +552,9 @@ static void setColumnInfoByExpr(STempTableNode* pTable, SExprNode* pExpr, SColum
|
||||||
if ('\0' == pCol->node.aliasName[0]) {
|
if ('\0' == pCol->node.aliasName[0]) {
|
||||||
strcpy(pCol->node.aliasName, pCol->colName);
|
strcpy(pCol->node.aliasName, pCol->colName);
|
||||||
}
|
}
|
||||||
|
if ('\0' == pCol->node.userAlias[0]) {
|
||||||
|
strcpy(pCol->node.userAlias, pCol->colName);
|
||||||
|
}
|
||||||
pCol->node.resType = pExpr->resType;
|
pCol->node.resType = pExpr->resType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -691,7 +697,7 @@ static EDealRes translateColumnUseAlias(STranslateContext* pCxt, SColumnNode** p
|
||||||
SNode* pNode;
|
SNode* pNode;
|
||||||
FOREACH(pNode, pProjectionList) {
|
FOREACH(pNode, pProjectionList) {
|
||||||
SExprNode* pExpr = (SExprNode*)pNode;
|
SExprNode* pExpr = (SExprNode*)pNode;
|
||||||
if (0 == strcmp((*pCol)->colName, pExpr->aliasName)) {
|
if (0 == strcmp((*pCol)->colName, pExpr->userAlias)) {
|
||||||
SColumnRefNode* pColRef = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF);
|
SColumnRefNode* pColRef = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF);
|
||||||
if (NULL == pColRef) {
|
if (NULL == pColRef) {
|
||||||
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
|
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1535,6 +1541,7 @@ static EDealRes rewriteColToSelectValFunc(STranslateContext* pCxt, SNode** pNode
|
||||||
}
|
}
|
||||||
strcpy(pFunc->functionName, "_select_value");
|
strcpy(pFunc->functionName, "_select_value");
|
||||||
strcpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName);
|
strcpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName);
|
||||||
|
strcpy(pFunc->node.userAlias, ((SExprNode*)*pNode)->userAlias);
|
||||||
pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode);
|
pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode);
|
||||||
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||||
pCxt->errCode = getFuncInfo(pCxt, pFunc);
|
pCxt->errCode = getFuncInfo(pCxt, pFunc);
|
||||||
|
@ -2171,12 +2178,51 @@ static int32_t translateFillValues(STranslateContext* pCxt, SSelectStmt* pSelect
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t rewriteProjectAlias(SNodeList* pProjectionList) {
|
||||||
|
int32_t no = 1;
|
||||||
|
SNode* pProject = NULL;
|
||||||
|
FOREACH(pProject, pProjectionList) {
|
||||||
|
SExprNode* pExpr = (SExprNode*)pProject;
|
||||||
|
if ('\0' == pExpr->userAlias[0]) {
|
||||||
|
strcpy(pExpr->userAlias, pExpr->aliasName);
|
||||||
|
}
|
||||||
|
sprintf(pExpr->aliasName, "#expr_%d", no++);
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t checkProjectAlias(STranslateContext* pCxt, SNodeList* pProjectionList) {
|
||||||
|
SHashObj* pUserAliasSet = taosHashInit(LIST_LENGTH(pProjectionList),
|
||||||
|
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
|
SNode* pProject = NULL;
|
||||||
|
FOREACH(pProject, pProjectionList) {
|
||||||
|
SExprNode* pExpr = (SExprNode*)pProject;
|
||||||
|
if (NULL != taosHashGet(pUserAliasSet, pExpr->userAlias, strlen(pExpr->userAlias))) {
|
||||||
|
taosHashCleanup(pUserAliasSet);
|
||||||
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_AMBIGUOUS_COLUMN, pExpr->userAlias);
|
||||||
|
}
|
||||||
|
taosHashPut(pUserAliasSet, pExpr->userAlias, strlen(pExpr->userAlias), &pExpr, POINTER_BYTES);
|
||||||
|
}
|
||||||
|
taosHashCleanup(pUserAliasSet);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t translateProjectionList(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
|
if (pSelect->isSubquery) {
|
||||||
|
return checkProjectAlias(pCxt, pSelect->pProjectionList);
|
||||||
|
}
|
||||||
|
return rewriteProjectAlias(pSelect->pProjectionList);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
pCxt->currClause = SQL_CLAUSE_SELECT;
|
pCxt->currClause = SQL_CLAUSE_SELECT;
|
||||||
int32_t code = translateExprList(pCxt, pSelect->pProjectionList);
|
int32_t code = translateExprList(pCxt, pSelect->pProjectionList);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = translateStar(pCxt, pSelect);
|
code = translateStar(pCxt, pSelect);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = translateProjectionList(pCxt, pSelect);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pProjectionList);
|
code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pProjectionList);
|
||||||
}
|
}
|
||||||
|
@ -2232,7 +2278,7 @@ static int32_t getQueryTimeRange(STranslateContext* pCxt, SNode* pWhere, STimeWi
|
||||||
}
|
}
|
||||||
|
|
||||||
SNode* pPrimaryKeyCond = NULL;
|
SNode* pPrimaryKeyCond = NULL;
|
||||||
nodesPartitionCond(&pCond, &pPrimaryKeyCond, NULL, NULL, NULL);
|
filterPartitionCond(&pCond, &pPrimaryKeyCond, NULL, NULL, NULL);
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (NULL != pPrimaryKeyCond) {
|
if (NULL != pPrimaryKeyCond) {
|
||||||
|
@ -2699,6 +2745,7 @@ static SNode* createSetOperProject(const char* pTableAlias, SNode* pNode) {
|
||||||
strcpy(pCol->tableAlias, pTableAlias);
|
strcpy(pCol->tableAlias, pTableAlias);
|
||||||
strcpy(pCol->colName, ((SExprNode*)pNode)->aliasName);
|
strcpy(pCol->colName, ((SExprNode*)pNode)->aliasName);
|
||||||
strcpy(pCol->node.aliasName, pCol->colName);
|
strcpy(pCol->node.aliasName, pCol->colName);
|
||||||
|
strcpy(pCol->node.userAlias, ((SExprNode*)pNode)->userAlias);
|
||||||
return (SNode*)pCol;
|
return (SNode*)pCol;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2810,7 +2857,7 @@ static int32_t partitionDeleteWhere(STranslateContext* pCxt, SDeleteStmt* pDelet
|
||||||
|
|
||||||
SNode* pPrimaryKeyCond = NULL;
|
SNode* pPrimaryKeyCond = NULL;
|
||||||
SNode* pOtherCond = NULL;
|
SNode* pOtherCond = NULL;
|
||||||
int32_t code = nodesPartitionCond(&pDelete->pWhere, &pPrimaryKeyCond, NULL, &pDelete->pTagCond, &pOtherCond);
|
int32_t code = filterPartitionCond(&pDelete->pWhere, &pPrimaryKeyCond, NULL, &pDelete->pTagCond, &pOtherCond);
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pOtherCond) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pOtherCond) {
|
||||||
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DELETE_WHERE);
|
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DELETE_WHERE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,6 +70,8 @@ TEST_F(ParserSelectTest, condition) {
|
||||||
run("SELECT c1 FROM t1 WHERE NOT ts in (true, false)");
|
run("SELECT c1 FROM t1 WHERE NOT ts in (true, false)");
|
||||||
|
|
||||||
run("SELECT * FROM t1 WHERE c1 > 10 and c1 is not null");
|
run("SELECT * FROM t1 WHERE c1 > 10 and c1 is not null");
|
||||||
|
|
||||||
|
run("SELECT * FROM t1 WHERE TBNAME like 'fda%' or TS > '2021-05-05 18:19:01.000'");
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(ParserSelectTest, pseudoColumn) {
|
TEST_F(ParserSelectTest, pseudoColumn) {
|
||||||
|
|
|
@ -436,7 +436,7 @@ static int32_t pushDownCondOptDealScan(SOptimizeContext* pCxt, SScanLogicNode* p
|
||||||
|
|
||||||
SNode* pPrimaryKeyCond = NULL;
|
SNode* pPrimaryKeyCond = NULL;
|
||||||
SNode* pOtherCond = NULL;
|
SNode* pOtherCond = NULL;
|
||||||
int32_t code = nodesPartitionCond(&pScan->node.pConditions, &pPrimaryKeyCond, &pScan->pTagIndexCond, &pScan->pTagCond,
|
int32_t code = filterPartitionCond(&pScan->node.pConditions, &pPrimaryKeyCond, &pScan->pTagIndexCond, &pScan->pTagCond,
|
||||||
&pOtherCond);
|
&pOtherCond);
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pScan->pTagCond) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pScan->pTagCond) {
|
||||||
code = pushDownCondOptRebuildTbanme(&pScan->pTagCond);
|
code = pushDownCondOptRebuildTbanme(&pScan->pTagCond);
|
||||||
|
|
|
@ -763,6 +763,8 @@ static SNode* stbSplCreateColumnNode(SExprNode* pExpr) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
|
if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
|
||||||
|
strcpy(pCol->dbName, ((SColumnNode*)pExpr)->dbName);
|
||||||
|
strcpy(pCol->tableName, ((SColumnNode*)pExpr)->tableName);
|
||||||
strcpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias);
|
strcpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias);
|
||||||
}
|
}
|
||||||
strcpy(pCol->colName, pExpr->aliasName);
|
strcpy(pCol->colName, pExpr->aliasName);
|
||||||
|
|
|
@ -39,6 +39,8 @@ TEST_F(PlanOrderByTest, expr) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
run("SELECT * FROM t1 ORDER BY c1 + 10, c2");
|
run("SELECT * FROM t1 ORDER BY c1 + 10, c2");
|
||||||
|
|
||||||
|
run("SELECT c1 FROM st1 ORDER BY ts, _C0");
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(PlanOrderByTest, nullsOrder) {
|
TEST_F(PlanOrderByTest, nullsOrder) {
|
||||||
|
|
|
@ -73,3 +73,9 @@ TEST_F(PlanSubqeuryTest, outerInterval) {
|
||||||
|
|
||||||
run("SELECT COUNT(*) FROM (SELECT ts, TOP(c1, 10) FROM st1s1) INTERVAL(5s)");
|
run("SELECT COUNT(*) FROM (SELECT ts, TOP(c1, 10) FROM st1s1) INTERVAL(5s)");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(PlanSubqeuryTest, outerPartition) {
|
||||||
|
useDb("root", "test");
|
||||||
|
|
||||||
|
run("SELECT c1, COUNT(*) FROM (SELECT ts, c1 FROM st1) PARTITION BY c1");
|
||||||
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
|
#include "functionMgt.h"
|
||||||
|
|
||||||
OptrStr gOptrStr[] = {
|
OptrStr gOptrStr[] = {
|
||||||
{0, "invalid"},
|
{0, "invalid"},
|
||||||
|
@ -3877,4 +3878,195 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnData
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct SClassifyConditionCxt {
|
||||||
|
bool hasPrimaryKey;
|
||||||
|
bool hasTagIndexCol;
|
||||||
|
bool hasTagCol;
|
||||||
|
bool hasOtherCol;
|
||||||
|
} SClassifyConditionCxt;
|
||||||
|
|
||||||
|
static EDealRes classifyConditionImpl(SNode* pNode, void* pContext) {
|
||||||
|
SClassifyConditionCxt* pCxt = (SClassifyConditionCxt*)pContext;
|
||||||
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
|
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||||
|
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && TSDB_SYSTEM_TABLE != pCol->tableType) {
|
||||||
|
pCxt->hasPrimaryKey = true;
|
||||||
|
} else if (pCol->hasIndex) {
|
||||||
|
pCxt->hasTagIndexCol = true;
|
||||||
|
pCxt->hasTagCol = true;
|
||||||
|
} else if (COLUMN_TYPE_TAG == pCol->colType || COLUMN_TYPE_TBNAME == pCol->colType) {
|
||||||
|
pCxt->hasTagCol = true;
|
||||||
|
} else {
|
||||||
|
pCxt->hasOtherCol = true;
|
||||||
|
}
|
||||||
|
} else if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
|
||||||
|
SFunctionNode* pFunc = (SFunctionNode*)pNode;
|
||||||
|
if (fmIsPseudoColumnFunc(pFunc->funcId)) {
|
||||||
|
if (FUNCTION_TYPE_TBNAME==pFunc->funcType) {
|
||||||
|
pCxt->hasTagCol = true;
|
||||||
|
} else {
|
||||||
|
pCxt->hasOtherCol = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef enum EConditionType {
|
||||||
|
COND_TYPE_PRIMARY_KEY = 1,
|
||||||
|
COND_TYPE_TAG_INDEX,
|
||||||
|
COND_TYPE_TAG,
|
||||||
|
COND_TYPE_NORMAL
|
||||||
|
} EConditionType;
|
||||||
|
|
||||||
|
static EConditionType classifyCondition(SNode* pNode) {
|
||||||
|
SClassifyConditionCxt cxt = {.hasPrimaryKey = false, .hasTagIndexCol = false, .hasOtherCol = false};
|
||||||
|
nodesWalkExpr(pNode, classifyConditionImpl, &cxt);
|
||||||
|
return cxt.hasOtherCol ? COND_TYPE_NORMAL
|
||||||
|
: (cxt.hasPrimaryKey && cxt.hasTagCol
|
||||||
|
? COND_TYPE_NORMAL
|
||||||
|
: (cxt.hasPrimaryKey ? COND_TYPE_PRIMARY_KEY
|
||||||
|
: (cxt.hasTagIndexCol ? COND_TYPE_TAG_INDEX : COND_TYPE_TAG)));
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t partitionLogicCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** pTagIndexCond, SNode** pTagCond,
|
||||||
|
SNode** pOtherCond) {
|
||||||
|
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(*pCondition);
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
SNodeList* pPrimaryKeyConds = NULL;
|
||||||
|
SNodeList* pTagIndexConds = NULL;
|
||||||
|
SNodeList* pTagConds = NULL;
|
||||||
|
SNodeList* pOtherConds = NULL;
|
||||||
|
SNode* pCond = NULL;
|
||||||
|
FOREACH(pCond, pLogicCond->pParameterList) {
|
||||||
|
switch (classifyCondition(pCond)) {
|
||||||
|
case COND_TYPE_PRIMARY_KEY:
|
||||||
|
if (NULL != pPrimaryKeyCond) {
|
||||||
|
code = nodesListMakeAppend(&pPrimaryKeyConds, nodesCloneNode(pCond));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case COND_TYPE_TAG_INDEX:
|
||||||
|
if (NULL != pTagIndexCond) {
|
||||||
|
code = nodesListMakeAppend(&pTagIndexConds, nodesCloneNode(pCond));
|
||||||
|
}
|
||||||
|
if (NULL != pTagCond) {
|
||||||
|
code = nodesListMakeAppend(&pTagConds, nodesCloneNode(pCond));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case COND_TYPE_TAG:
|
||||||
|
if (NULL != pTagCond) {
|
||||||
|
code = nodesListMakeAppend(&pTagConds, nodesCloneNode(pCond));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case COND_TYPE_NORMAL:
|
||||||
|
default:
|
||||||
|
if (NULL != pOtherCond) {
|
||||||
|
code = nodesListMakeAppend(&pOtherConds, nodesCloneNode(pCond));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pTempPrimaryKeyCond = NULL;
|
||||||
|
SNode* pTempTagIndexCond = NULL;
|
||||||
|
SNode* pTempTagCond = NULL;
|
||||||
|
SNode* pTempOtherCond = NULL;
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodesMergeConds(&pTempPrimaryKeyCond, &pPrimaryKeyConds);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodesMergeConds(&pTempTagIndexCond, &pTagIndexConds);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodesMergeConds(&pTempTagCond, &pTagConds);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodesMergeConds(&pTempOtherCond, &pOtherConds);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
if (NULL != pPrimaryKeyCond) {
|
||||||
|
*pPrimaryKeyCond = pTempPrimaryKeyCond;
|
||||||
|
}
|
||||||
|
if (NULL != pTagIndexCond) {
|
||||||
|
*pTagIndexCond = pTempTagIndexCond;
|
||||||
|
}
|
||||||
|
if (NULL != pTagCond) {
|
||||||
|
*pTagCond = pTempTagCond;
|
||||||
|
}
|
||||||
|
if (NULL != pOtherCond) {
|
||||||
|
*pOtherCond = pTempOtherCond;
|
||||||
|
}
|
||||||
|
nodesDestroyNode(*pCondition);
|
||||||
|
*pCondition = NULL;
|
||||||
|
} else {
|
||||||
|
nodesDestroyList(pPrimaryKeyConds);
|
||||||
|
nodesDestroyList(pTagIndexConds);
|
||||||
|
nodesDestroyList(pTagConds);
|
||||||
|
nodesDestroyList(pOtherConds);
|
||||||
|
nodesDestroyNode(pTempPrimaryKeyCond);
|
||||||
|
nodesDestroyNode(pTempTagIndexCond);
|
||||||
|
nodesDestroyNode(pTempTagCond);
|
||||||
|
nodesDestroyNode(pTempOtherCond);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t filterPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** pTagIndexCond, SNode** pTagCond,
|
||||||
|
SNode** pOtherCond) {
|
||||||
|
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pCondition) &&
|
||||||
|
LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)*pCondition)->condType) {
|
||||||
|
return partitionLogicCond(pCondition, pPrimaryKeyCond, pTagIndexCond, pTagCond, pOtherCond);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool needOutput = false;
|
||||||
|
switch (classifyCondition(*pCondition)) {
|
||||||
|
case COND_TYPE_PRIMARY_KEY:
|
||||||
|
if (NULL != pPrimaryKeyCond) {
|
||||||
|
*pPrimaryKeyCond = *pCondition;
|
||||||
|
needOutput = true;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case COND_TYPE_TAG_INDEX:
|
||||||
|
if (NULL != pTagIndexCond) {
|
||||||
|
*pTagIndexCond = *pCondition;
|
||||||
|
needOutput = true;
|
||||||
|
}
|
||||||
|
if (NULL != pTagCond) {
|
||||||
|
SNode* pTempCond = *pCondition;
|
||||||
|
if (NULL != pTagIndexCond) {
|
||||||
|
pTempCond = nodesCloneNode(*pCondition);
|
||||||
|
if (NULL == pTempCond) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*pTagCond = pTempCond;
|
||||||
|
needOutput = true;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case COND_TYPE_TAG:
|
||||||
|
if (NULL != pTagCond) {
|
||||||
|
*pTagCond = *pCondition;
|
||||||
|
needOutput = true;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case COND_TYPE_NORMAL:
|
||||||
|
default:
|
||||||
|
if (NULL != pOtherCond) {
|
||||||
|
*pOtherCond = *pCondition;
|
||||||
|
needOutput = true;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (needOutput) {
|
||||||
|
*pCondition = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
|
@ -278,7 +278,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SHashObj *planToTask = taosHashInit(
|
SHashObj *planToTask = taosHashInit(
|
||||||
SCHEDULE_DEFAULT_MAX_TASK_NUM,
|
pDag->numOfSubplans,
|
||||||
taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
|
taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
|
||||||
HASH_NO_LOCK);
|
HASH_NO_LOCK);
|
||||||
if (NULL == planToTask) {
|
if (NULL == planToTask) {
|
||||||
|
|
|
@ -134,21 +134,95 @@ int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs) {
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
iconv_t conv;
|
||||||
|
int8_t inUse;
|
||||||
|
} SConv;
|
||||||
|
|
||||||
|
SConv *gConv = NULL;
|
||||||
|
int32_t convUsed = 0;
|
||||||
|
int32_t gConvMaxNum = 0;
|
||||||
|
|
||||||
|
void taosConvInit(void) {
|
||||||
|
gConvMaxNum = 512;
|
||||||
|
gConv = taosMemoryCalloc(gConvMaxNum, sizeof(SConv));
|
||||||
|
for (int32_t i = 0; i < gConvMaxNum; ++i) {
|
||||||
|
gConv[i].conv = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset);
|
||||||
|
if ((iconv_t)-1 == gConv[i].conv || (iconv_t)0 == gConv[i].conv) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void taosConvDestroy() {
|
||||||
|
for (int32_t i = 0; i < gConvMaxNum; ++i) {
|
||||||
|
iconv_close(gConv[i].conv);
|
||||||
|
}
|
||||||
|
taosMemoryFreeClear(gConv);
|
||||||
|
gConvMaxNum = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
iconv_t taosAcquireConv(int32_t *idx) {
|
||||||
|
if (gConvMaxNum <= 0) {
|
||||||
|
*idx = -1;
|
||||||
|
return iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
int32_t used = atomic_add_fetch_32(&convUsed, 1);
|
||||||
|
if (used > gConvMaxNum) {
|
||||||
|
used = atomic_sub_fetch_32(&convUsed, 1);
|
||||||
|
sched_yield();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t startId = taosGetSelfPthreadId() % gConvMaxNum;
|
||||||
|
while (true) {
|
||||||
|
if (gConv[startId].inUse) {
|
||||||
|
startId = (startId + 1) % gConvMaxNum;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int8_t old = atomic_val_compare_exchange_8(&gConv[startId].inUse, 0, 1);
|
||||||
|
if (0 == old) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*idx = startId;
|
||||||
|
return gConv[startId].conv;
|
||||||
|
}
|
||||||
|
|
||||||
|
void taosReleaseConv(int32_t idx, iconv_t conv) {
|
||||||
|
if (idx < 0) {
|
||||||
|
iconv_close(conv);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic_store_8(&gConv[idx].inUse, 0);
|
||||||
|
atomic_sub_fetch_32(&convUsed, 1);
|
||||||
|
}
|
||||||
|
|
||||||
bool taosMbsToUcs4(const char *mbs, size_t mbsLength, TdUcs4 *ucs4, int32_t ucs4_max_len, int32_t *len) {
|
bool taosMbsToUcs4(const char *mbs, size_t mbsLength, TdUcs4 *ucs4, int32_t ucs4_max_len, int32_t *len) {
|
||||||
#ifdef DISALLOW_NCHAR_WITHOUT_ICONV
|
#ifdef DISALLOW_NCHAR_WITHOUT_ICONV
|
||||||
printf("Nchar cannot be read and written without iconv, please install iconv library and recompile TDengine.\n");
|
printf("Nchar cannot be read and written without iconv, please install iconv library and recompile TDengine.\n");
|
||||||
return -1;
|
return -1;
|
||||||
#else
|
#else
|
||||||
memset(ucs4, 0, ucs4_max_len);
|
memset(ucs4, 0, ucs4_max_len);
|
||||||
iconv_t cd = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset);
|
|
||||||
|
int32_t idx = -1;
|
||||||
|
iconv_t conv = taosAcquireConv(&idx);
|
||||||
size_t ucs4_input_len = mbsLength;
|
size_t ucs4_input_len = mbsLength;
|
||||||
size_t outLeft = ucs4_max_len;
|
size_t outLeft = ucs4_max_len;
|
||||||
if (iconv(cd, (char **)&mbs, &ucs4_input_len, (char **)&ucs4, &outLeft) == -1) {
|
if (iconv(conv, (char **)&mbs, &ucs4_input_len, (char **)&ucs4, &outLeft) == -1) {
|
||||||
iconv_close(cd);
|
taosReleaseConv(idx, conv);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
iconv_close(cd);
|
taosReleaseConv(idx, conv);
|
||||||
if (len != NULL) {
|
if (len != NULL) {
|
||||||
*len = (int32_t)(ucs4_max_len - outLeft);
|
*len = (int32_t)(ucs4_max_len - outLeft);
|
||||||
if (*len < 0) {
|
if (*len < 0) {
|
||||||
|
|
|
@ -2631,7 +2631,7 @@ sql_error select tb1.ts,tb1.c1,tb2_1.u1 from tb1, tb2_1 where tb1.ts=tb2_1.ts or
|
||||||
|
|
||||||
|
|
||||||
print "ts&tbname test"
|
print "ts&tbname test"
|
||||||
sql_error select count(*) from stb1 where ts > 0 or tbname like 'tb%';
|
sql select count(*) from stb1 where ts > 0 or tbname like 'tb%';
|
||||||
|
|
||||||
print "ts&tag test"
|
print "ts&tag test"
|
||||||
sql select count(*) from stb1 where ts > 0 or t1 > 0;
|
sql select count(*) from stb1 where ts > 0 or t1 > 0;
|
||||||
|
@ -2717,9 +2717,9 @@ print "tbname&tag&join test"
|
||||||
|
|
||||||
|
|
||||||
print "column&ts&tbname&tag test"
|
print "column&ts&tbname&tag test"
|
||||||
sql_error select * from stb1 where (tbname like 'tb%' or ts > '2021-05-05 18:19:01.000') and (t1 > 5 or t1 < 4) and c1 > 0;
|
sql select * from stb1 where (tbname like 'tb%' or ts > '2021-05-05 18:19:01.000') and (t1 > 5 or t1 < 4) and c1 > 0;
|
||||||
sql select * from stb1 where (ts > '2021-05-05 18:19:01.000') and (ts > '2021-05-05 18:19:02.000' or t1 > 3) and (t1 > 5 or t1 < 4) and c1 > 0;
|
sql select * from stb1 where (ts > '2021-05-05 18:19:01.000') and (ts > '2021-05-05 18:19:02.000' or t1 > 3) and (t1 > 5 or t1 < 4) and c1 > 0;
|
||||||
sql_error select ts,c1,c7 from stb1 where ts > '2021-05-05 18:19:03.000' or ts > '2021-05-05 18:19:20.000' and col > 0 and t1 > 0;
|
sql select ts,c1,c7 from stb1 where ts > '2021-05-05 18:19:03.000' or ts > '2021-05-05 18:19:20.000' and c1 > 0 and t1 > 0;
|
||||||
|
|
||||||
|
|
||||||
print "column&ts&tbname&join test"
|
print "column&ts&tbname&join test"
|
||||||
|
|
|
@ -112,7 +112,7 @@ ELSE ()
|
||||||
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X github.com/taosdata/taosadapter/version.Version=${taos_version} -X github.com/taosdata/taosadapter/version.CommitID=${taosadapter_commit_sha1}"
|
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X github.com/taosdata/taosadapter/version.Version=${taos_version} -X github.com/taosdata/taosadapter/version.CommitID=${taosadapter_commit_sha1}"
|
||||||
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X github.com/taosdata/taosadapter/version.Version=${taos_version} -X github.com/taosdata/taosadapter/version.CommitID=${taosadapter_commit_sha1}"
|
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X github.com/taosdata/taosadapter/version.Version=${taos_version} -X github.com/taosdata/taosadapter/version.CommitID=${taosadapter_commit_sha1}"
|
||||||
INSTALL_COMMAND
|
INSTALL_COMMAND
|
||||||
COMMAND wget -c https://github.com/upx/upx/releases/download/v3.96/upx-3.96-${PLATFORM_ARCH_STR}_linux.tar.xz -O ${CMAKE_CURRENT_SOURCE_DIR}/upx.tar.xz && tar -xvJf ${CMAKE_CURRENT_SOURCE_DIR}/upx.tar.xz -C ${CMAKE_CURRENT_SOURCE_DIR} --strip-components 1 > /dev/null && ${CMAKE_CURRENT_SOURCE_DIR}/upx taosadapter || :
|
COMMAND wget -c https://github.com/upx/upx/releases/download/v3.96/upx-3.96-${PLATFORM_ARCH_STR}_linux.tar.xz -O $ENV{HOME}/upx.tar.xz && tar -xvJf $ENV{HOME}/upx.tar.xz -C $ENV{HOME}/ --strip-components 1 > /dev/null && $ENV{HOME}/upx taosadapter || :
|
||||||
COMMAND cmake -E copy taosadapter ${CMAKE_BINARY_DIR}/build/bin
|
COMMAND cmake -E copy taosadapter ${CMAKE_BINARY_DIR}/build/bin
|
||||||
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/
|
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/
|
||||||
COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/
|
COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/
|
||||||
|
|
Loading…
Reference in New Issue