Merge branch 'v3.0' into fix/long_query
This commit is contained in:
commit
07f16b1660
|
@ -2,7 +2,7 @@
|
|||
# taosadapter
|
||||
ExternalProject_Add(taosadapter
|
||||
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
|
||||
GIT_TAG 5662a6d
|
||||
GIT_TAG a2e9920
|
||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
|
||||
BINARY_DIR ""
|
||||
#BUILD_IN_SOURCE TRUE
|
||||
|
|
|
@ -94,6 +94,7 @@ description: "TDengine 3.0 版本的语法变更说明"
|
|||
| 9 | SAMPLE | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
|
||||
| 10 | STATECOUNT | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
|
||||
| 11 | STATEDURATION | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
|
||||
| 12 | TIMETRUNCATE | 增强 | 增加ignore_timezone参数,可选是否使用,默认值为1.
|
||||
|
||||
|
||||
## SCHEMALESS 变更
|
||||
|
|
|
@ -401,6 +401,7 @@ typedef struct SCreateStreamStmt {
|
|||
SNode* pQuery;
|
||||
SNodeList* pTags;
|
||||
SNode* pSubtable;
|
||||
SNodeList* pCols;
|
||||
} SCreateStreamStmt;
|
||||
|
||||
typedef struct SDropStreamStmt {
|
||||
|
|
|
@ -22,12 +22,16 @@ extern "C" {
|
|||
|
||||
// If the error is in a third-party library, place this header file under the third-party library header file.
|
||||
// When you want to use this feature, you should find or add the same function in the following sectio
|
||||
#if !defined(WINDOWS)
|
||||
|
||||
#ifndef ALLOW_FORBID_FUNC
|
||||
#define malloc MALLOC_FUNC_TAOS_FORBID
|
||||
#define calloc CALLOC_FUNC_TAOS_FORBID
|
||||
#define realloc REALLOC_FUNC_TAOS_FORBID
|
||||
#define free FREE_FUNC_TAOS_FORBID
|
||||
#endif
|
||||
#endif // ifndef ALLOW_FORBID_FUNC
|
||||
|
||||
#endif // if !defined(WINDOWS)
|
||||
|
||||
void *taosMemoryMalloc(int64_t size);
|
||||
void *taosMemoryCalloc(int64_t num, int64_t size);
|
||||
|
|
|
@ -62,11 +62,38 @@ void taosResetTerminalMode();
|
|||
taosMemoryFree(strings); \
|
||||
}
|
||||
#else
|
||||
#include <windows.h>
|
||||
#include <dbghelp.h>
|
||||
|
||||
#define STACKSIZE 64
|
||||
#define taosPrintTrace(flags, level, dflag) \
|
||||
{ \
|
||||
taosPrintLog(flags, level, dflag, \
|
||||
"backtrace not implemented on windows, so detailed stack information cannot be printed"); \
|
||||
}
|
||||
unsigned int i; \
|
||||
void* stack[STACKSIZE]; \
|
||||
unsigned short frames; \
|
||||
SYMBOL_INFO* symbol; \
|
||||
HANDLE process; \
|
||||
\
|
||||
process = GetCurrentProcess(); \
|
||||
\
|
||||
SymInitialize(process, NULL, TRUE); \
|
||||
\
|
||||
frames = CaptureStackBackTrace(0, STACKSIZE, stack, NULL); \
|
||||
symbol = (SYMBOL_INFO*)calloc(sizeof(SYMBOL_INFO) + 256 * sizeof(char), 1); \
|
||||
if (symbol != NULL) { \
|
||||
symbol->MaxNameLen = 255; \
|
||||
symbol->SizeOfStruct = sizeof(SYMBOL_INFO); \
|
||||
\
|
||||
if (frames > 0) { \
|
||||
taosPrintLog(flags, level, dflag, "obtained %d stack frames", frames); \
|
||||
for (i = 0; i < frames; i++) { \
|
||||
SymFromAddr(process, (DWORD64)(stack[i]), 0, symbol); \
|
||||
taosPrintLog(flags, level, dflag, "frame:%i: %s - 0x%0X", frames - i - 1, symbol->Name, symbol->Address); \
|
||||
} \
|
||||
} \
|
||||
free(symbol); \
|
||||
} \
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -4145,6 +4145,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
|||
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
qTrace("tsdb/read: %p, take read mutex", pReader);
|
||||
taosThreadMutexLock(&pReader->readerMutex);
|
||||
if (pReader->suspended) {
|
||||
tsdbReaderResume(pReader);
|
||||
|
@ -4154,7 +4155,9 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
|||
bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
|
||||
pReader->step = EXTERNAL_ROWS_PREV;
|
||||
if (ret) {
|
||||
pStatus = &pReader->innerReader[0]->status;
|
||||
if (pStatus->composedDataBlock) {
|
||||
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||
}
|
||||
|
||||
|
@ -4177,6 +4180,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
|||
bool ret = doTsdbNextDataBlock(pReader);
|
||||
if (ret) {
|
||||
if (pStatus->composedDataBlock) {
|
||||
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||
}
|
||||
|
||||
|
@ -4194,7 +4198,9 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
|||
bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
|
||||
pReader->step = EXTERNAL_ROWS_NEXT;
|
||||
if (ret1) {
|
||||
pStatus = &pReader->innerReader[1]->status;
|
||||
if (pStatus->composedDataBlock) {
|
||||
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||
}
|
||||
|
||||
|
@ -4202,6 +4208,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
|||
}
|
||||
}
|
||||
|
||||
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||
|
||||
return false;
|
||||
|
@ -4334,19 +4341,12 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
|
|||
*pBlockSMA = pResBlock->pBlockAgg;
|
||||
pReader->cost.smaDataLoad += 1;
|
||||
|
||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||
|
||||
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
|
||||
return code;
|
||||
}
|
||||
|
||||
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
if (pStatus->composedDataBlock) {
|
||||
return pReader->pResBlock;
|
||||
}
|
||||
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
|
||||
STableBlockScanInfo* pBlockScanInfo =
|
||||
*(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
||||
|
@ -4354,7 +4354,6 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
|
|||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
|
||||
taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
|
||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -4362,28 +4361,40 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tBlockDataDestroy(&pStatus->fileBlockData);
|
||||
terrno = code;
|
||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||
|
||||
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
|
||||
return pReader->pResBlock;
|
||||
}
|
||||
|
||||
void tsdbReleaseDataBlock(STsdbReader* pReader) { taosThreadMutexUnlock(&pReader->readerMutex); }
|
||||
void tsdbReleaseDataBlock(STsdbReader* pReader) {
|
||||
// SReaderStatus* pStatus = &pReader->status;
|
||||
// if (!pStatus->composedDataBlock) {
|
||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||
//}
|
||||
}
|
||||
|
||||
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
|
||||
STsdbReader* pTReader = pReader;
|
||||
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
||||
if (pReader->step == EXTERNAL_ROWS_PREV) {
|
||||
return doRetrieveDataBlock(pReader->innerReader[0]);
|
||||
pTReader = pReader->innerReader[0];
|
||||
} else if (pReader->step == EXTERNAL_ROWS_NEXT) {
|
||||
return doRetrieveDataBlock(pReader->innerReader[1]);
|
||||
pTReader = pReader->innerReader[1];
|
||||
}
|
||||
}
|
||||
|
||||
return doRetrieveDataBlock(pReader);
|
||||
SReaderStatus* pStatus = &pTReader->status;
|
||||
if (pStatus->composedDataBlock) {
|
||||
return pTReader->pResBlock;
|
||||
}
|
||||
|
||||
SSDataBlock* ret = doRetrieveDataBlock(pTReader);
|
||||
|
||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||
|
@ -4543,7 +4554,9 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
|
|||
// current table is exhausted, let's try the next table
|
||||
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
|
||||
}
|
||||
tsdbReleaseDataBlock(pReader);
|
||||
|
||||
taosThreadMutexUnlock(&pReader->readerMutex);
|
||||
|
||||
return rows;
|
||||
}
|
||||
|
||||
|
|
|
@ -582,34 +582,34 @@ typedef struct SCtgOperation {
|
|||
#define CTG_LOCK(type, _lock) \
|
||||
do { \
|
||||
if (CTG_READ == (type)) { \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before read lock"); \
|
||||
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosRLockLatch(_lock); \
|
||||
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) > 0); \
|
||||
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value after read lock"); \
|
||||
} else { \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before write lock"); \
|
||||
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosWLockLatch(_lock); \
|
||||
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define CTG_UNLOCK(type, _lock) \
|
||||
do { \
|
||||
if (CTG_READ == (type)) { \
|
||||
assert(atomic_load_32((_lock)) > 0); \
|
||||
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \
|
||||
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosRUnLockLatch(_lock); \
|
||||
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \
|
||||
} else { \
|
||||
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \
|
||||
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosWUnLockLatch(_lock); \
|
||||
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
|
|
@ -22,7 +22,6 @@ extern SCatalogMgmt gCtgMgmt;
|
|||
SCtgDebug gCTGDebug = {0};
|
||||
|
||||
void ctgdUserCallback(SMetaData *pResult, void *param, int32_t code) {
|
||||
ASSERT(*(int32_t *)param == 1);
|
||||
taosMemoryFree(param);
|
||||
|
||||
qDebug("async call result: %s", tstrerror(code));
|
||||
|
|
|
@ -40,7 +40,9 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
|
|||
msgNum = taosArrayGetSize(batchRsp.pRsps);
|
||||
}
|
||||
|
||||
ASSERT(taskNum == msgNum || 0 == msgNum);
|
||||
if (ASSERTS(taskNum == msgNum || 0 == msgNum, "taskNum %d mis-match msgNum %d", taskNum, msgNum)) {
|
||||
msgNum = 0;
|
||||
}
|
||||
|
||||
ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId,
|
||||
TMSG_INFO(cbParam->reqType + 1));
|
||||
|
@ -58,11 +60,19 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
|
|||
if (msgNum > 0) {
|
||||
pRsp = taosArrayGet(batchRsp.pRsps, i);
|
||||
|
||||
taskMsg.msgType = pRsp->reqType;
|
||||
taskMsg.pData = pRsp->msg;
|
||||
taskMsg.len = pRsp->msgLen;
|
||||
|
||||
ASSERT(pRsp->msgIdx == *msgIdx);
|
||||
if (ASSERTS(pRsp->msgIdx == *msgIdx, "rsp msgIdx %d mis-match msgIdx %d", pRsp->msgIdx, *msgIdx)) {
|
||||
pRsp = &rsp;
|
||||
pRsp->msgIdx = *msgIdx;
|
||||
pRsp->reqType = -1;
|
||||
pRsp->rspCode = 0;
|
||||
taskMsg.msgType = -1;
|
||||
taskMsg.pData = NULL;
|
||||
taskMsg.len = 0;
|
||||
} else {
|
||||
taskMsg.msgType = pRsp->reqType;
|
||||
taskMsg.pData = pRsp->msg;
|
||||
taskMsg.len = pRsp->msgLen;
|
||||
}
|
||||
} else {
|
||||
pRsp = &rsp;
|
||||
pRsp->msgIdx = *msgIdx;
|
||||
|
|
|
@ -40,7 +40,6 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
|
|||
(*pRsp)->numOfCols = htonl(numOfCols);
|
||||
|
||||
int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols);
|
||||
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -1666,7 +1666,6 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
|||
rsp->numOfRows = htobe64((int64_t)rowNum);
|
||||
|
||||
int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock));
|
||||
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
|
||||
|
||||
rsp->compLen = htonl(len);
|
||||
|
||||
|
|
|
@ -41,6 +41,7 @@ typedef struct SDataInserterHandle {
|
|||
SHashObj* pCols;
|
||||
int32_t status;
|
||||
bool queryEnd;
|
||||
bool fullOrderColList;
|
||||
uint64_t useconds;
|
||||
uint64_t cachedSize;
|
||||
TdThreadMutex mutex;
|
||||
|
@ -153,7 +154,6 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
|
|||
SSubmitReq2* pReq = *ppReq;
|
||||
SArray* pVals = NULL;
|
||||
int32_t numOfBlks = 0;
|
||||
bool fullCol = (pInserter->pNode->pCols->length == pTSchema->numOfCols);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -196,7 +196,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
|
|||
for (int32_t k = 0; k < pTSchema->numOfCols; ++k) { // iterate by column
|
||||
int16_t colIdx = k;
|
||||
const STColumn* pCol = &pTSchema->columns[k];
|
||||
if (!fullCol) {
|
||||
if (!pInserter->fullOrderColList) {
|
||||
int16_t* slotId = taosHashGet(pInserter->pCols, &pCol->colId, sizeof(pCol->colId));
|
||||
if (NULL == slotId) {
|
||||
continue;
|
||||
|
@ -439,12 +439,19 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
inserter->fullOrderColList = pInserterNode->pCols->length == inserter->pSchema->numOfCols;
|
||||
|
||||
inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT),
|
||||
false, HASH_NO_LOCK);
|
||||
SNode* pNode = NULL;
|
||||
int32_t i = 0;
|
||||
FOREACH(pNode, pInserterNode->pCols) {
|
||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||
taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId));
|
||||
if (inserter->fullOrderColList && pCol->colId != inserter->pSchema->columns[i].colId) {
|
||||
inserter->fullOrderColList = false;
|
||||
}
|
||||
++i;
|
||||
}
|
||||
|
||||
tsem_init(&inserter->ready, 0, 0);
|
||||
|
|
|
@ -319,6 +319,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
|
|||
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
|
||||
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
|
||||
|
@ -340,6 +341,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
|
|||
pCost->filterOutBlocks += 1;
|
||||
(*status) = FUNC_DATA_REQUIRED_FILTEROUT;
|
||||
|
||||
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -216,7 +216,7 @@ SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool
|
|||
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pFuncName);
|
||||
SNode* createStreamOptions(SAstCreateContext* pCxt);
|
||||
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pStreamName, SNode* pRealTable,
|
||||
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery);
|
||||
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols);
|
||||
SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pStreamName);
|
||||
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId);
|
||||
SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId);
|
||||
|
|
|
@ -541,9 +541,15 @@ bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B).
|
|||
|
||||
/************************************************ create/drop stream **************************************************/
|
||||
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO
|
||||
full_table_name(C) tags_def_opt(F) subtable_opt(G) AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D); }
|
||||
full_table_name(C) col_list_opt(H) tags_def_opt(F) subtable_opt(G)
|
||||
AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D, H); }
|
||||
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
|
||||
|
||||
%type col_list_opt { SNodeList* }
|
||||
%destructor col_list_opt { nodesDestroyList($$); }
|
||||
col_list_opt(A) ::= . { A = NULL; }
|
||||
col_list_opt(A) ::= NK_LP col_name_list(B) NK_RP. { A = B; }
|
||||
|
||||
stream_options(A) ::= . { A = createStreamOptions(pCxt); }
|
||||
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_AT_ONCE; A = B; }
|
||||
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; }
|
||||
|
|
|
@ -1745,21 +1745,20 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) {
|
|||
}
|
||||
|
||||
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pStreamName, SNode* pRealTable,
|
||||
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery) {
|
||||
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)nodesMakeNode(QUERY_NODE_CREATE_STREAM_STMT);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
COPY_STRING_FORM_ID_TOKEN(pStmt->streamName, pStreamName);
|
||||
if (NULL != pRealTable) {
|
||||
strcpy(pStmt->targetDbName, ((SRealTableNode*)pRealTable)->table.dbName);
|
||||
strcpy(pStmt->targetTabName, ((SRealTableNode*)pRealTable)->table.tableName);
|
||||
nodesDestroyNode(pRealTable);
|
||||
}
|
||||
strcpy(pStmt->targetDbName, ((SRealTableNode*)pRealTable)->table.dbName);
|
||||
strcpy(pStmt->targetTabName, ((SRealTableNode*)pRealTable)->table.tableName);
|
||||
nodesDestroyNode(pRealTable);
|
||||
pStmt->ignoreExists = ignoreExists;
|
||||
pStmt->pOptions = (SStreamOptions*)pOptions;
|
||||
pStmt->pQuery = pQuery;
|
||||
pStmt->pTags = pTags;
|
||||
pStmt->pSubtable = pSubtable;
|
||||
pStmt->pCols = pCols;
|
||||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
|
|
|
@ -355,7 +355,12 @@ static int32_t collectMetaKeyFromDescribe(SCollectMetaKeyCxt* pCxt, SDescribeStm
|
|||
}
|
||||
|
||||
static int32_t collectMetaKeyFromCreateStream(SCollectMetaKeyCxt* pCxt, SCreateStreamStmt* pStmt) {
|
||||
return collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
|
||||
int32_t code =
|
||||
reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->targetDbName, pStmt->targetTabName, pCxt->pMetaCache);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromShowDnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||
|
|
|
@ -352,7 +352,7 @@ static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STa
|
|||
code = catalogGetTableMeta(pParCxt->pCatalog, &conn, pName, pMeta);
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code && TSDB_CODE_TSC_INVALID_TABLE_NAME != code) {
|
||||
if (TSDB_CODE_SUCCESS != code && TSDB_CODE_PAR_TABLE_NOT_EXIST != code) {
|
||||
parserError("0x%" PRIx64 " catalogGetTableMeta error, code:%s, dbName:%s, tbName:%s", pCxt->pParseCxt->requestId,
|
||||
tstrerror(code), pName->dbname, pName->tname);
|
||||
}
|
||||
|
@ -4979,10 +4979,10 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SSchema* getColSchema(STableMeta* pTableMeta, const char* pColName) {
|
||||
static const SSchema* getColSchema(const STableMeta* pTableMeta, const char* pColName) {
|
||||
int32_t numOfFields = getNumOfTags(pTableMeta) + getNumOfColumns(pTableMeta);
|
||||
for (int32_t i = 0; i < numOfFields; ++i) {
|
||||
SSchema* pSchema = pTableMeta->schema + i;
|
||||
const SSchema* pSchema = pTableMeta->schema + i;
|
||||
if (0 == strcmp(pColName, pSchema->name)) {
|
||||
return pSchema;
|
||||
}
|
||||
|
@ -5002,7 +5002,8 @@ static SSchema* getTagSchema(STableMeta* pTableMeta, const char* pTagName) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta) {
|
||||
static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTableStmt* pStmt,
|
||||
const STableMeta* pTableMeta) {
|
||||
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
|
||||
if (getNumOfTags(pTableMeta) == 1 && pTagsSchema->type == TSDB_DATA_TYPE_JSON &&
|
||||
(pStmt->alterType == TSDB_ALTER_TABLE_ADD_TAG || pStmt->alterType == TSDB_ALTER_TABLE_DROP_TAG ||
|
||||
|
@ -5021,7 +5022,7 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable
|
|||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE, "Table is not super table");
|
||||
}
|
||||
|
||||
SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
|
||||
const SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
|
||||
if (NULL == pSchema) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pStmt->colName);
|
||||
} else if (!IS_VAR_DATA_TYPE(pSchema->type) || pSchema->type != pStmt->dataType.type ||
|
||||
|
@ -5721,6 +5722,139 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t adjustDataTypeOfProjections(STranslateContext* pCxt, const STableMeta* pMeta, SNodeList* pProjections) {
|
||||
if (getNumOfColumns(pMeta) != LIST_LENGTH(pProjections)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns");
|
||||
}
|
||||
|
||||
SSchema* pSchemas = getTableColumnSchema(pMeta);
|
||||
int32_t index = 0;
|
||||
SNode* pProj = NULL;
|
||||
FOREACH(pProj, pProjections) {
|
||||
SSchema* pSchema = pSchemas + index;
|
||||
SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes};
|
||||
if (!dataTypeEqual(&dt, &((SExprNode*)pProj)->resType)) {
|
||||
SNode* pFunc = NULL;
|
||||
int32_t code = createCastFunc(pCxt, pProj, dt, &pFunc);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
REPLACE_NODE(pFunc);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef struct SProjColPos {
|
||||
int32_t colId;
|
||||
SNode* pProj;
|
||||
} SProjColPos;
|
||||
|
||||
static int32_t projColPosCompar(const void* l, const void* r) {
|
||||
return ((SProjColPos*)l)->colId < ((SProjColPos*)r)->colId;
|
||||
}
|
||||
|
||||
static void projColPosDelete(void* p) { taosMemoryFree(((SProjColPos*)p)->pProj); }
|
||||
|
||||
static int32_t addProjToProjColPos(STranslateContext* pCxt, const SSchema* pSchema, SNode* pProj, SArray* pProjColPos) {
|
||||
SNode* pNewProj = nodesCloneNode(pProj);
|
||||
if (NULL == pNewProj) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes};
|
||||
if (!dataTypeEqual(&dt, &((SExprNode*)pNewProj)->resType)) {
|
||||
SNode* pFunc = NULL;
|
||||
code = createCastFunc(pCxt, pNewProj, dt, &pFunc);
|
||||
pNewProj = pFunc;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SProjColPos pos = {.colId = pSchema->colId, .pProj = pNewProj};
|
||||
code = (NULL == taosArrayPush(pProjColPos, &pos) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyNode(pNewProj);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t adjustOrderOfProjection(STranslateContext* pCxt, SNodeList* pCols, const STableMeta* pMeta,
|
||||
SNodeList** pProjections) {
|
||||
if (LIST_LENGTH(pCols) != LIST_LENGTH(*pProjections)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns");
|
||||
}
|
||||
|
||||
SArray* pProjColPos = taosArrayInit(LIST_LENGTH(pCols), sizeof(SProjColPos));
|
||||
if (NULL == pProjColPos) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SNode* pCol = NULL;
|
||||
SNode* pProj = NULL;
|
||||
FORBOTH(pCol, pCols, pProj, *pProjections) {
|
||||
const SSchema* pSchema = getColSchema(pMeta, ((SColumnNode*)pCol)->colName);
|
||||
code = addProjToProjColPos(pCxt, pSchema, pProj, pProjColPos);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
SNodeList* pNewProjections = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
taosArraySort(pProjColPos, projColPosCompar);
|
||||
int32_t num = taosArrayGetSize(pProjColPos);
|
||||
pNewProjections = nodesMakeList();
|
||||
if (NULL == pNewProjections) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < num; ++i) {
|
||||
SProjColPos* pPos = taosArrayGet(pProjColPos, i);
|
||||
code = nodesListStrictAppend(pNewProjections, pPos->pProj);
|
||||
pPos->pProj = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
taosArrayDestroy(pProjColPos);
|
||||
nodesDestroyList(*pProjections);
|
||||
*pProjections = pNewProjections;
|
||||
} else {
|
||||
taosArrayDestroyEx(pProjColPos, projColPosDelete);
|
||||
nodesDestroyList(pNewProjections);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t adjustStreamQueryForExistTableImpl(STranslateContext* pCxt, SCreateStreamStmt* pStmt,
|
||||
const STableMeta* pMeta) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||
if (NULL == pStmt->pCols) {
|
||||
return adjustDataTypeOfProjections(pCxt, pMeta, pSelect->pProjectionList);
|
||||
}
|
||||
return adjustOrderOfProjection(pCxt, pStmt->pCols, pMeta, &pSelect->pProjectionList);
|
||||
}
|
||||
|
||||
static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt,
|
||||
SCMCreateStreamReq* pReq) {
|
||||
STableMeta* pMeta = NULL;
|
||||
int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, &pMeta);
|
||||
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
|
||||
if (NULL != pStmt->pCols) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
|
||||
}
|
||||
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = adjustStreamQueryForExistTableImpl(pCxt, pStmt, pMeta);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
|
||||
pCxt->createStream = true;
|
||||
int32_t code = addWstartTsToCreateStreamQuery(pCxt, pStmt->pQuery);
|
||||
|
@ -5733,6 +5867,9 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkStreamQuery(pCxt, pStmt);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = adjustStreamQueryForExistTable(pCxt, pStmt, pReq);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
|
||||
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
|
||||
|
@ -5771,8 +5908,6 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
|
|||
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
|
||||
}
|
||||
|
||||
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -7358,12 +7493,12 @@ static int32_t buildAddColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, S
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta,
|
||||
static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, const STableMeta* pTableMeta,
|
||||
SVAlterTbReq* pReq) {
|
||||
if (2 == getNumOfColumns(pTableMeta)) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DROP_COL);
|
||||
}
|
||||
SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
|
||||
const SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
|
||||
if (NULL == pSchema) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pStmt->colName);
|
||||
} else if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
|
||||
|
@ -7379,11 +7514,11 @@ static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta,
|
||||
static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, const STableMeta* pTableMeta,
|
||||
SVAlterTbReq* pReq) {
|
||||
pReq->colModBytes = calcTypeBytes(pStmt->dataType);
|
||||
pReq->colModType = pStmt->dataType.type;
|
||||
SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
|
||||
const SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
|
||||
if (NULL == pSchema) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pStmt->colName);
|
||||
} else if (!IS_VAR_DATA_TYPE(pSchema->type) || pSchema->type != pStmt->dataType.type ||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -427,7 +427,7 @@ class MockCatalogServiceImpl {
|
|||
int32_t copyTableSchemaMeta(const string& db, const string& tbname, std::unique_ptr<STableMeta>* dst) const {
|
||||
STableMeta* src = getTableSchemaMeta(db, tbname);
|
||||
if (nullptr == src) {
|
||||
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
|
||||
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||
}
|
||||
int32_t len = sizeof(STableMeta) + sizeof(SSchema) * (src->tableInfo.numOfTags + src->tableInfo.numOfColumns);
|
||||
dst->reset((STableMeta*)taosMemoryCalloc(1, len));
|
||||
|
|
|
@ -755,14 +755,23 @@ TEST_F(ParserInitialCTest, createStream) {
|
|||
};
|
||||
|
||||
auto setCreateStreamReq = [&](const char* pStream, const char* pSrcDb, const char* pSql, const char* pDstStb,
|
||||
int8_t igExists = 0, int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0,
|
||||
int64_t watermark = 0, int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED,
|
||||
int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY) {
|
||||
int8_t createStb = STREAM_CREATE_STABLE_TRUE, int8_t igExists = 0) {
|
||||
snprintf(expect.name, sizeof(expect.name), "0.%s", pStream);
|
||||
snprintf(expect.sourceDB, sizeof(expect.sourceDB), "0.%s", pSrcDb);
|
||||
snprintf(expect.targetStbFullName, sizeof(expect.targetStbFullName), "0.test.%s", pDstStb);
|
||||
expect.igExists = igExists;
|
||||
expect.sql = strdup(pSql);
|
||||
expect.createStb = createStb;
|
||||
expect.triggerType = STREAM_TRIGGER_AT_ONCE;
|
||||
expect.maxDelay = 0;
|
||||
expect.watermark = 0;
|
||||
expect.fillHistory = STREAM_DEFAULT_FILL_HISTORY;
|
||||
expect.igExpired = STREAM_DEFAULT_IGNORE_EXPIRED;
|
||||
};
|
||||
|
||||
auto setStreamOptions = [&](int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0, int64_t watermark = 0,
|
||||
int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED,
|
||||
int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY) {
|
||||
expect.triggerType = triggerType;
|
||||
expect.maxDelay = maxDelay;
|
||||
expect.watermark = watermark;
|
||||
|
@ -813,19 +822,22 @@ TEST_F(ParserInitialCTest, createStream) {
|
|||
ASSERT_EQ(pField->flags, pExpectField->flags);
|
||||
}
|
||||
}
|
||||
ASSERT_EQ(req.checkpointFreq, expect.checkpointFreq);
|
||||
ASSERT_EQ(req.createStb, expect.createStb);
|
||||
tFreeSCMCreateStreamReq(&req);
|
||||
});
|
||||
|
||||
setCreateStreamReq("s1", "test", "create stream s1 into st1 as select count(*) from t1 interval(10s)", "st1");
|
||||
run("CREATE STREAM s1 INTO st1 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)");
|
||||
setCreateStreamReq("s1", "test", "create stream s1 into st3 as select count(*) from t1 interval(10s)", "st3");
|
||||
run("CREATE STREAM s1 INTO st3 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)");
|
||||
clearCreateStreamReq();
|
||||
|
||||
setCreateStreamReq(
|
||||
"s1", "test",
|
||||
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st1 "
|
||||
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st3 "
|
||||
"as select count(*) from t1 interval(10s)",
|
||||
"st1", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1);
|
||||
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st1 AS "
|
||||
"st3", 1, 1);
|
||||
setStreamOptions(STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1);
|
||||
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st3 AS "
|
||||
"SELECT COUNT(*) "
|
||||
"FROM t1 INTERVAL(10S)");
|
||||
clearCreateStreamReq();
|
||||
|
@ -839,6 +851,11 @@ TEST_F(ParserInitialCTest, createStream) {
|
|||
run("CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) "
|
||||
"AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, tag1 id INTERVAL(10S)");
|
||||
clearCreateStreamReq();
|
||||
|
||||
setCreateStreamReq("s1", "test", "create stream s1 into st1 as select max(c1), c2 from t1 interval(10s)", "st1",
|
||||
STREAM_CREATE_STABLE_FALSE);
|
||||
run("CREATE STREAM s1 INTO st1 AS SELECT MAX(c1), c2 FROM t1 INTERVAL(10S)");
|
||||
clearCreateStreamReq();
|
||||
}
|
||||
|
||||
TEST_F(ParserInitialCTest, createStreamSemanticCheck) {
|
||||
|
|
|
@ -30,7 +30,7 @@ TEST_F(PlanOtherTest, createTopic) {
|
|||
TEST_F(PlanOtherTest, createStream) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("create stream if not exists s1 trigger window_close watermark 10s into st1 as select count(*) from t1 "
|
||||
run("create stream if not exists s1 trigger window_close watermark 10s into st3 as select count(*) from t1 "
|
||||
"interval(10s)");
|
||||
|
||||
run("CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) "
|
||||
|
@ -43,9 +43,9 @@ TEST_F(PlanOtherTest, createStream) {
|
|||
TEST_F(PlanOtherTest, createStreamUseSTable) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("CREATE STREAM IF NOT EXISTS s1 into st1 as SELECT COUNT(*) FROM st1 INTERVAL(10s)");
|
||||
run("CREATE STREAM IF NOT EXISTS s1 into st3 as SELECT COUNT(*) FROM st1 INTERVAL(10s)");
|
||||
|
||||
run("CREATE STREAM IF NOT EXISTS s1 into st1 as SELECT COUNT(*) FROM st1 PARTITION BY TBNAME INTERVAL(10s)");
|
||||
run("CREATE STREAM IF NOT EXISTS s1 into st3 as SELECT COUNT(*) FROM st1 PARTITION BY TBNAME INTERVAL(10s)");
|
||||
}
|
||||
|
||||
TEST_F(PlanOtherTest, createSmaIndex) {
|
||||
|
|
|
@ -116,8 +116,6 @@ int32_t cleanupTaskQueue() {
|
|||
}
|
||||
|
||||
static void execHelper(struct SSchedMsg* pSchedMsg) {
|
||||
assert(pSchedMsg != NULL && pSchedMsg->ahandle != NULL);
|
||||
|
||||
__async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
|
||||
int32_t code = execFn(pSchedMsg->thandle);
|
||||
if (code != 0 && pSchedMsg->msg != NULL) {
|
||||
|
@ -126,8 +124,6 @@ static void execHelper(struct SSchedMsg* pSchedMsg) {
|
|||
}
|
||||
|
||||
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) {
|
||||
assert(execFn != NULL);
|
||||
|
||||
SSchedMsg schedMsg = {0};
|
||||
schedMsg.fp = execHelper;
|
||||
schedMsg.ahandle = execFn;
|
||||
|
@ -138,7 +134,10 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
|
|||
}
|
||||
|
||||
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
|
||||
assert(pMsgBody != NULL);
|
||||
if (NULL == pMsgBody) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pMsgBody->target.dbFName);
|
||||
taosMemoryFreeClear(pMsgBody->msgInfo.pData);
|
||||
if (pMsgBody->paramFreeFp) {
|
||||
|
@ -394,7 +393,7 @@ char* parseTagDatatoJson(void* p) {
|
|||
} else if (pTagVal->nData == 0) {
|
||||
value = cJSON_CreateString("");
|
||||
} else {
|
||||
ASSERT(0);
|
||||
goto end;
|
||||
}
|
||||
|
||||
cJSON_AddItemToObject(json, tagJsonKey, value);
|
||||
|
@ -413,7 +412,7 @@ char* parseTagDatatoJson(void* p) {
|
|||
}
|
||||
cJSON_AddItemToObject(json, tagJsonKey, value);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
|
|
|
@ -316,34 +316,34 @@ typedef struct SQWorkerMgmt {
|
|||
#define QW_LOCK(type, _lock) \
|
||||
do { \
|
||||
if (QW_READ == (type)) { \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before read lock"); \
|
||||
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosRLockLatch(_lock); \
|
||||
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) > 0); \
|
||||
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value after read lock"); \
|
||||
} else { \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before write lock"); \
|
||||
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosWLockLatch(_lock); \
|
||||
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define QW_UNLOCK(type, _lock) \
|
||||
do { \
|
||||
if (QW_READ == (type)) { \
|
||||
assert(atomic_load_32((_lock)) > 0); \
|
||||
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \
|
||||
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosRUnLockLatch(_lock); \
|
||||
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \
|
||||
} else { \
|
||||
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \
|
||||
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosWUnLockLatch(_lock); \
|
||||
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
|
|
@ -147,7 +147,6 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
|
|||
size_t numOfResBlock = taosArrayGetSize(pResList);
|
||||
for (int32_t j = 0; j < numOfResBlock; ++j) {
|
||||
SSDataBlock *pRes = taosArrayGetP(pResList, j);
|
||||
ASSERT(pRes->info.rows > 0);
|
||||
|
||||
SInputData inputData = {.pData = pRes};
|
||||
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
|
||||
|
|
|
@ -478,34 +478,34 @@ extern SSchedulerMgmt schMgmt;
|
|||
#define SCH_LOCK(type, _lock) \
|
||||
do { \
|
||||
if (SCH_READ == (type)) { \
|
||||
assert(atomic_load_32(_lock) >= 0); \
|
||||
ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before read lock"); \
|
||||
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosRLockLatch(_lock); \
|
||||
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32(_lock) > 0); \
|
||||
ASSERTS(atomic_load_32(_lock) > 0, "invalid lock value after read lock"); \
|
||||
} else { \
|
||||
assert(atomic_load_32(_lock) >= 0); \
|
||||
ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before write lock"); \
|
||||
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosWLockLatch(_lock); \
|
||||
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32(_lock) == TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||
ASSERTS(atomic_load_32(_lock) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define SCH_UNLOCK(type, _lock) \
|
||||
do { \
|
||||
if (SCH_READ == (type)) { \
|
||||
assert(atomic_load_32((_lock)) > 0); \
|
||||
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \
|
||||
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosRUnLockLatch(_lock); \
|
||||
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \
|
||||
} else { \
|
||||
assert(atomic_load_32((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY); \
|
||||
ASSERTS(atomic_load_32((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \
|
||||
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosWUnLockLatch(_lock); \
|
||||
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
assert(atomic_load_32((_lock)) >= 0); \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ target_link_libraries(
|
|||
)
|
||||
if(TD_WINDOWS)
|
||||
target_link_libraries(
|
||||
os PUBLIC ws2_32 iconv msvcregex wcwidth winmm crashdump
|
||||
os PUBLIC ws2_32 iconv msvcregex wcwidth winmm crashdump dbghelp
|
||||
)
|
||||
elseif(TD_DARWIN_64)
|
||||
find_library(CORE_FOUNDATION_FRAMEWORK CoreFoundation)
|
||||
|
@ -59,4 +59,8 @@ endif()
|
|||
|
||||
IF (JEMALLOC_ENABLED)
|
||||
target_link_libraries(os PUBLIC -ljemalloc)
|
||||
ENDIF ()
|
||||
ENDIF ()
|
||||
|
||||
if(${BUILD_TEST})
|
||||
add_subdirectory(test)
|
||||
endif(${BUILD_TEST})
|
|
@ -0,0 +1,24 @@
|
|||
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
|
||||
PROJECT(TDengine)
|
||||
|
||||
FIND_PATH(HEADER_GTEST_INCLUDE_DIR gtest.h /usr/include/gtest /usr/local/include/gtest)
|
||||
FIND_LIBRARY(LIB_GTEST_STATIC_DIR libgtest.a /usr/lib/ /usr/local/lib /usr/lib64)
|
||||
FIND_LIBRARY(LIB_GTEST_SHARED_DIR libgtest.so /usr/lib/ /usr/local/lib /usr/lib64)
|
||||
|
||||
IF (HEADER_GTEST_INCLUDE_DIR AND (LIB_GTEST_STATIC_DIR OR LIB_GTEST_SHARED_DIR))
|
||||
MESSAGE(STATUS "gTest library found, build os test")
|
||||
|
||||
INCLUDE_DIRECTORIES(${HEADER_GTEST_INCLUDE_DIR})
|
||||
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||
|
||||
ENDIF()
|
||||
|
||||
INCLUDE_DIRECTORIES(${TD_SOURCE_DIR}/src/util/inc)
|
||||
|
||||
# osTests
|
||||
add_executable(osTests "osTests.cpp")
|
||||
target_link_libraries(osTests os util gtest_main)
|
||||
add_test(
|
||||
NAME osTests
|
||||
COMMAND osTests
|
||||
)
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <iostream>
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||
#pragma GCC diagnostic ignored "-Wunused-function"
|
||||
#pragma GCC diagnostic ignored "-Wunused-variable"
|
||||
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||
#pragma GCC diagnostic ignored "-Wformat"
|
||||
#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
|
||||
#pragma GCC diagnostic ignored "-Wpointer-arith"
|
||||
|
||||
#include "os.h"
|
||||
#include "tlog.h"
|
||||
|
||||
TEST(osTest, osSystem) {
|
||||
const char *flags = "UTL FATAL ";
|
||||
ELogLevel level = DEBUG_FATAL;
|
||||
int32_t dflag = 255; // tsLogEmbedded ? 255 : uDebugFlag
|
||||
taosPrintTrace(flags, level, dflag);
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
|
@ -42,4 +42,24 @@ if $rows != 2 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
print ======== step2
|
||||
sql drop database if exists db1;
|
||||
sql create database db1 vgroups 1;
|
||||
sql use db1;
|
||||
sql create table t1(ts timestamp, a int, b int );
|
||||
sql create table t2(ts timestamp, a int, b int );
|
||||
sql insert into t1 values(1648791211000,1,2);
|
||||
sql insert into t2 (ts, b, a) select ts, a, b from t1;
|
||||
sql select * from t2;
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data02 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue