Merge from master
This commit is contained in:
commit
adb5b0f927
|
@ -52,12 +52,18 @@ def pre_test(){
|
|||
git checkout master
|
||||
'''
|
||||
}
|
||||
else {
|
||||
else if(env.CHANGE_TARGET == '2.0'){
|
||||
sh '''
|
||||
cd ${WKC}
|
||||
git checkout 2.0
|
||||
'''
|
||||
}
|
||||
else{
|
||||
sh '''
|
||||
cd ${WKC}
|
||||
git checkout develop
|
||||
'''
|
||||
}
|
||||
}
|
||||
}
|
||||
sh'''
|
||||
cd ${WKC}
|
||||
|
@ -75,7 +81,13 @@ def pre_test(){
|
|||
git checkout master
|
||||
'''
|
||||
}
|
||||
else {
|
||||
else if(env.CHANGE_TARGET == '2.0'){
|
||||
sh '''
|
||||
cd ${WK}
|
||||
git checkout 2.0
|
||||
'''
|
||||
}
|
||||
else{
|
||||
sh '''
|
||||
cd ${WK}
|
||||
git checkout develop
|
||||
|
|
|
@ -991,9 +991,7 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
|
|||
index = 0;
|
||||
sToken = tStrGetToken(*str, &index, false);
|
||||
if (sToken.n == 0 || sToken.type != TK_RP) {
|
||||
tscSQLSyntaxErrMsg(pInsertParam->msg, ") expected", *str);
|
||||
code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
|
||||
return code;
|
||||
return tscSQLSyntaxErrMsg(pInsertParam->msg, ") expected", *str);
|
||||
}
|
||||
|
||||
*str += index;
|
||||
|
|
|
@ -471,6 +471,7 @@ typedef struct {
|
|||
|
||||
bool stableQuery; // super table query or not
|
||||
bool topBotQuery; // TODO used bitwise flag
|
||||
bool interpQuery; // interp query or not
|
||||
bool groupbyColumn; // denote if this is a groupby normal column query
|
||||
bool hasTagResults; // if there are tag values in final result or not
|
||||
bool timeWindowInterpo;// if the time window start/end required interpolation
|
||||
|
|
|
@ -75,6 +75,7 @@ extern char configDir[];
|
|||
#define BUFFER_SIZE TSDB_MAX_ALLOWED_SQL_LEN
|
||||
#define COND_BUF_LEN (BUFFER_SIZE - 30)
|
||||
#define COL_BUFFER_LEN ((TSDB_COL_NAME_LEN + 15) * TSDB_MAX_COLUMNS)
|
||||
|
||||
#define MAX_USERNAME_SIZE 64
|
||||
#define MAX_PASSWORD_SIZE 16
|
||||
#define MAX_HOSTNAME_SIZE 253 // https://man7.org/linux/man-pages/man7/hostname.7.html
|
||||
|
@ -1413,6 +1414,7 @@ static char *rand_float_str()
|
|||
return g_randfloat_buff + (cursor * FLOAT_BUFF_LEN);
|
||||
}
|
||||
|
||||
|
||||
static float rand_float()
|
||||
{
|
||||
static int cursor;
|
||||
|
|
|
@ -253,11 +253,15 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
|
|||
|
||||
int32_t connId = htonl(pHBMsg->connId);
|
||||
SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort);
|
||||
if (pConn == NULL) {
|
||||
pHBMsg->pid = htonl(pHBMsg->pid);
|
||||
pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pHBMsg->pid, pHBMsg->appName);
|
||||
}
|
||||
|
||||
if (pConn == NULL) {
|
||||
// do not close existing links, otherwise
|
||||
// mError("failed to create connId, close connect");
|
||||
// pRsp->killConnection = 1;
|
||||
// pRsp->killConnection = 1;
|
||||
} else {
|
||||
pRsp->connId = htonl(pConn->connId);
|
||||
mnodeSaveQueryStreamList(pConn, pHBMsg);
|
||||
|
|
|
@ -65,7 +65,14 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
|
|||
return TSDB_CODE_MND_MSG_NOT_PROCESSED;
|
||||
}
|
||||
|
||||
int32_t code = mnodeInitMsg(pMsg);
|
||||
int32_t code = grantCheck(TSDB_GRANT_TIME);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("msg:%p, app:%p type:%s not processed, reason:%s", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType],
|
||||
tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
code = mnodeInitMsg(pMsg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("msg:%p, app:%p type:%s not processed, reason:%s", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType],
|
||||
tstrerror(code));
|
||||
|
|
|
@ -331,6 +331,8 @@ enum OPERATOR_TYPE_E {
|
|||
OP_Distinct = 20,
|
||||
OP_Join = 21,
|
||||
OP_StateWindow = 22,
|
||||
OP_AllTimeWindow = 23,
|
||||
OP_AllMultiTableTimeInterval = 24,
|
||||
};
|
||||
|
||||
typedef struct SOperatorInfo {
|
||||
|
@ -549,11 +551,13 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
|
|||
SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
|
||||
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
|
||||
|
|
|
@ -3708,27 +3708,59 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
|
|||
}
|
||||
} else {
|
||||
// no data generated yet
|
||||
if (pCtx->size == 1) {
|
||||
if (pCtx->size < 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
// check the timestamp in input buffer
|
||||
TSKEY skey = GET_TS_DATA(pCtx, 0);
|
||||
TSKEY ekey = GET_TS_DATA(pCtx, 1);
|
||||
|
||||
// no data generated yet
|
||||
if (!(skey < pCtx->startTs && ekey > pCtx->startTs)) {
|
||||
return;
|
||||
}
|
||||
|
||||
assert(pCtx->start.key == INT64_MIN && skey < pCtx->startTs && ekey > pCtx->startTs);
|
||||
|
||||
if (type == TSDB_FILL_PREV) {
|
||||
if (skey > pCtx->startTs) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (pCtx->size > 1) {
|
||||
TSKEY ekey = GET_TS_DATA(pCtx, 1);
|
||||
if (ekey > skey && ekey <= pCtx->startTs) {
|
||||
skey = ekey;
|
||||
}
|
||||
}
|
||||
assignVal(pCtx->pOutput, pCtx->pInput, pCtx->outputBytes, pCtx->inputType);
|
||||
} else if (type == TSDB_FILL_NEXT) {
|
||||
char* val = ((char*)pCtx->pInput) + pCtx->inputBytes;
|
||||
TSKEY ekey = skey;
|
||||
char* val = NULL;
|
||||
|
||||
if (ekey < pCtx->startTs) {
|
||||
if (pCtx->size > 1) {
|
||||
ekey = GET_TS_DATA(pCtx, 1);
|
||||
if (ekey < pCtx->startTs) {
|
||||
return;
|
||||
}
|
||||
|
||||
val = ((char*)pCtx->pInput) + pCtx->inputBytes;
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
val = (char*)pCtx->pInput;
|
||||
}
|
||||
|
||||
assignVal(pCtx->pOutput, val, pCtx->outputBytes, pCtx->inputType);
|
||||
} else if (type == TSDB_FILL_LINEAR) {
|
||||
if (pCtx->size <= 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
TSKEY ekey = GET_TS_DATA(pCtx, 1);
|
||||
|
||||
// no data generated yet
|
||||
if (!(skey < pCtx->startTs && ekey > pCtx->startTs)) {
|
||||
return;
|
||||
}
|
||||
|
||||
assert(pCtx->start.key == INT64_MIN && skey < pCtx->startTs && ekey > pCtx->startTs);
|
||||
|
||||
char *start = GET_INPUT_DATA(pCtx, 0);
|
||||
char *end = GET_INPUT_DATA(pCtx, 1);
|
||||
|
||||
|
|
|
@ -448,6 +448,44 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntim
|
|||
pResultRowInfo->capacity = (int32_t)newCapacity;
|
||||
}
|
||||
|
||||
static bool chkResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData,
|
||||
int16_t bytes, bool masterscan, uint64_t uid) {
|
||||
bool existed = false;
|
||||
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);
|
||||
|
||||
SResultRow **p1 =
|
||||
(SResultRow **)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
|
||||
// in case of repeat scan/reverse scan, no new time window added.
|
||||
if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQueryAttr)) {
|
||||
if (!masterscan) { // the *p1 may be NULL in case of sliding+offset exists.
|
||||
return p1 != NULL;
|
||||
}
|
||||
|
||||
if (p1 != NULL) {
|
||||
if (pResultRowInfo->size == 0) {
|
||||
existed = false;
|
||||
assert(pResultRowInfo->curPos == -1);
|
||||
} else if (pResultRowInfo->size == 1) {
|
||||
existed = (pResultRowInfo->pResult[0] == (*p1));
|
||||
} else { // check if current pResultRowInfo contains the existed pResultRow
|
||||
SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid, pResultRowInfo);
|
||||
int64_t* index = taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
|
||||
if (index != NULL) {
|
||||
existed = true;
|
||||
} else {
|
||||
existed = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return existed;
|
||||
}
|
||||
|
||||
return p1 != NULL;
|
||||
}
|
||||
|
||||
|
||||
static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid,
|
||||
char* pData, int16_t bytes, bool masterscan, uint64_t tableGroupId) {
|
||||
bool existed = false;
|
||||
|
@ -592,6 +630,35 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t
|
|||
return w;
|
||||
}
|
||||
|
||||
// get the correct time window according to the handled timestamp
|
||||
static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQueryAttr) {
|
||||
STimeWindow w = {0};
|
||||
|
||||
if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value
|
||||
getInitialStartTimeWindow(pQueryAttr, ts, &w);
|
||||
|
||||
if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') {
|
||||
w.ekey = taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1;
|
||||
} else {
|
||||
w.ekey = w.skey + pQueryAttr->interval.interval - 1;
|
||||
}
|
||||
} else {
|
||||
w = getResultRow(pResultRowInfo, pResultRowInfo->curPos)->win;
|
||||
}
|
||||
|
||||
/*
|
||||
* query border check, skey should not be bounded by the query time range, since the value skey will
|
||||
* be used as the time window index value. So we only change ekey of time window accordingly.
|
||||
*/
|
||||
if (w.ekey > pQueryAttr->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) {
|
||||
w.ekey = pQueryAttr->window.ekey;
|
||||
}
|
||||
|
||||
return w;
|
||||
}
|
||||
|
||||
|
||||
|
||||
// a new buffer page for each table. Needs to opt this design
|
||||
static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t tid, uint32_t size) {
|
||||
if (pWindowRes->pageId != -1) {
|
||||
|
@ -637,6 +704,14 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
|
|||
return 0;
|
||||
}
|
||||
|
||||
static bool chkWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, STimeWindow *win,
|
||||
bool masterscan, SResultRow **pResult, int64_t groupId, SQLFunctionCtx* pCtx,
|
||||
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
|
||||
assert(win->skey <= win->ekey);
|
||||
|
||||
return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
|
||||
}
|
||||
|
||||
static int32_t setResultOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, STimeWindow *win,
|
||||
bool masterscan, SResultRow **pResult, int64_t tableGroupId, SQLFunctionCtx* pCtx,
|
||||
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
|
||||
|
@ -707,7 +782,7 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
|
|||
}
|
||||
}
|
||||
|
||||
assert(forwardStep > 0);
|
||||
assert(forwardStep >= 0);
|
||||
return forwardStep;
|
||||
}
|
||||
|
||||
|
@ -764,6 +839,8 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
|
|||
pResultRowInfo->curPos = i + 1; // current not closed result object
|
||||
}
|
||||
}
|
||||
|
||||
//pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->curIndex]->win.skey;
|
||||
}
|
||||
|
||||
static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQueryAttr* pQueryAttr, TSKEY lastKey) {
|
||||
|
@ -813,7 +890,7 @@ static int32_t getNumOfRowsInTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
|
|||
}
|
||||
}
|
||||
|
||||
assert(num > 0);
|
||||
assert(num >= 0);
|
||||
return num;
|
||||
}
|
||||
|
||||
|
@ -973,6 +1050,11 @@ static int32_t getNextQualifiedWindow(SQueryAttr* pQueryAttr, STimeWindow *pNext
|
|||
}
|
||||
}
|
||||
|
||||
/* interp query with fill should not skip time window */
|
||||
if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
|
||||
return startPos;
|
||||
}
|
||||
|
||||
/*
|
||||
* This time window does not cover any data, try next time window,
|
||||
* this case may happen when the time window is too small
|
||||
|
@ -1485,6 +1567,82 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
|
||||
}
|
||||
|
||||
|
||||
static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) {
|
||||
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info;
|
||||
|
||||
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
|
||||
int32_t numOfOutput = pOperatorInfo->numOfOutput;
|
||||
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
|
||||
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
|
||||
|
||||
TSKEY* tsCols = NULL;
|
||||
if (pSDataBlock->pDataBlock != NULL) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
|
||||
tsCols = (int64_t*) pColDataInfo->pData;
|
||||
assert(tsCols[0] == pSDataBlock->info.window.skey &&
|
||||
tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
|
||||
}
|
||||
|
||||
int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1);
|
||||
TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows);
|
||||
|
||||
STimeWindow win = getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr);
|
||||
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
|
||||
|
||||
SResultRow* pResult = NULL;
|
||||
int32_t forwardStep = 0;
|
||||
int32_t ret = 0;
|
||||
|
||||
while (1) {
|
||||
// null data, failed to allocate more memory buffer
|
||||
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult,
|
||||
tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
TSKEY ekey = reviseWindowEkey(pQueryAttr, &win);
|
||||
forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
|
||||
|
||||
// window start(end) key interpolation
|
||||
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
|
||||
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
|
||||
|
||||
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
|
||||
startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos);
|
||||
if (startPos < 0) {
|
||||
if (win.skey <= pQueryAttr->window.ekey) {
|
||||
int32_t code = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId,
|
||||
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
|
||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
startPos = pSDataBlock->info.rows - 1;
|
||||
|
||||
// window start(end) key interpolation
|
||||
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
|
||||
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
||||
}
|
||||
|
||||
if (pQueryAttr->timeWindowInterpo) {
|
||||
int32_t rowIndex = ascQuery? (pSDataBlock->info.rows-1):0;
|
||||
saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex);
|
||||
}
|
||||
|
||||
updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
|
||||
}
|
||||
|
||||
|
||||
|
||||
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
|
||||
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
STableQueryInfo* item = pRuntimeEnv->current;
|
||||
|
@ -1981,6 +2139,12 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
|
||||
break;
|
||||
}
|
||||
case OP_AllMultiTableTimeInterval: {
|
||||
pRuntimeEnv->proot =
|
||||
createAllMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
|
||||
break;
|
||||
}
|
||||
case OP_TimeWindow: {
|
||||
pRuntimeEnv->proot =
|
||||
createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
|
@ -1990,6 +2154,15 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
}
|
||||
break;
|
||||
}
|
||||
case OP_AllTimeWindow: {
|
||||
pRuntimeEnv->proot =
|
||||
createAllTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
|
||||
if (opType != OP_DummyInput && opType != OP_Join) {
|
||||
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case OP_Groupby: {
|
||||
pRuntimeEnv->proot =
|
||||
createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
|
@ -2918,6 +3091,8 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
|
|||
|
||||
// check if this data block is required to load
|
||||
if ((*status) != BLK_DATA_ALL_NEEDED) {
|
||||
bool needFilter = true;
|
||||
|
||||
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
|
||||
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
|
||||
|
@ -2927,10 +3102,16 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
|
|||
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
|
||||
|
||||
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
|
||||
if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId,
|
||||
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
|
||||
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
if (pQueryAttr->pointInterpQuery) {
|
||||
needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
|
||||
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
|
||||
pTableScanInfo->rowCellInfoOffset);
|
||||
} else {
|
||||
if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId,
|
||||
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
|
||||
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
} else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
|
||||
doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
|
||||
|
@ -2938,7 +3119,11 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
|
|||
pRuntimeEnv->current->groupIndex);
|
||||
}
|
||||
|
||||
(*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
|
||||
if (needFilter) {
|
||||
(*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
|
||||
} else {
|
||||
(*status) = BLK_DATA_ALL_NEEDED;
|
||||
}
|
||||
}
|
||||
|
||||
SDataBlockInfo* pBlockInfo = &pBlock->info;
|
||||
|
@ -4546,6 +4731,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
|
|||
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
|
||||
pQueryAttr->tsdb = tsdb;
|
||||
|
||||
|
||||
if (tsdb != NULL) {
|
||||
int32_t code = setupQueryHandle(tsdb, pRuntimeEnv, pQInfo->qId, pQueryAttr->stableQuery);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -4954,7 +5140,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
|
|||
pTableScanInfo->pCtx = pAggInfo->binfo.pCtx;
|
||||
pTableScanInfo->pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
|
||||
pTableScanInfo->rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
|
||||
} else if (pDownstream->operatorType == OP_TimeWindow) {
|
||||
} else if (pDownstream->operatorType == OP_TimeWindow || pDownstream->operatorType == OP_AllTimeWindow) {
|
||||
STableIntervalOperatorInfo *pIntervalInfo = pDownstream->info;
|
||||
|
||||
pTableScanInfo->pCtx = pIntervalInfo->pCtx;
|
||||
|
@ -4968,7 +5154,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
|
|||
pTableScanInfo->pResultRowInfo = &pGroupbyInfo->binfo.resultRowInfo;
|
||||
pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->binfo.rowCellInfoOffset;
|
||||
|
||||
} else if (pDownstream->operatorType == OP_MultiTableTimeInterval) {
|
||||
} else if (pDownstream->operatorType == OP_MultiTableTimeInterval || pDownstream->operatorType == OP_AllMultiTableTimeInterval) {
|
||||
STableIntervalOperatorInfo *pInfo = pDownstream->info;
|
||||
|
||||
pTableScanInfo->pCtx = pInfo->pCtx;
|
||||
|
@ -5587,6 +5773,66 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
|
|||
return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes;
|
||||
}
|
||||
|
||||
static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STableIntervalOperatorInfo* pIntervalInfo = pOperator->info;
|
||||
|
||||
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
||||
|
||||
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
||||
return pIntervalInfo->pRes;
|
||||
}
|
||||
|
||||
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
int32_t order = pQueryAttr->order.order;
|
||||
STimeWindow win = pQueryAttr->window;
|
||||
|
||||
SOperatorInfo* upstream = pOperator->upstream[0];
|
||||
|
||||
while(1) {
|
||||
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
|
||||
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
|
||||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order);
|
||||
hashAllIntervalAgg(pOperator, &pIntervalInfo->resultRowInfo, pBlock, 0);
|
||||
}
|
||||
|
||||
// restore the value
|
||||
pQueryAttr->order.order = order;
|
||||
pQueryAttr->window = win;
|
||||
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
closeAllResultRows(&pIntervalInfo->resultRowInfo);
|
||||
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
|
||||
finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
|
||||
|
||||
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo);
|
||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
||||
|
||||
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
||||
return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes;
|
||||
}
|
||||
|
||||
static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
|
@ -5642,6 +5888,63 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
|
|||
return pIntervalInfo->pRes;
|
||||
}
|
||||
|
||||
static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STableIntervalOperatorInfo* pIntervalInfo = pOperator->info;
|
||||
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
|
||||
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
||||
return pIntervalInfo->pRes;
|
||||
}
|
||||
|
||||
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
int32_t order = pQueryAttr->order.order;
|
||||
|
||||
SOperatorInfo* upstream = pOperator->upstream[0];
|
||||
|
||||
while(1) {
|
||||
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
|
||||
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
|
||||
|
||||
setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
|
||||
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order);
|
||||
setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
|
||||
|
||||
hashAllIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex);
|
||||
}
|
||||
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
pQueryAttr->order.order = order; // TODO : restore the order
|
||||
doCloseAllTimeWindow(pRuntimeEnv);
|
||||
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
|
||||
|
||||
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
|
||||
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
||||
return pIntervalInfo->pRes;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
|
||||
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
|
@ -6263,6 +6566,32 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
|
|||
appendUpstream(pOperator, upstream);
|
||||
return pOperator;
|
||||
}
|
||||
|
||||
|
||||
SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
|
||||
|
||||
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
|
||||
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
|
||||
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
|
||||
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
|
||||
pOperator->name = "AllTimeIntervalAggOperator";
|
||||
pOperator->operatorType = OP_AllTimeWindow;
|
||||
pOperator->blockingOptr = true;
|
||||
pOperator->status = OP_IN_EXECUTING;
|
||||
pOperator->pExpr = pExpr;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||
pOperator->exec = doAllIntervalAgg;
|
||||
pOperator->cleanup = destroyBasicOperatorInfo;
|
||||
|
||||
appendUpstream(pOperator, upstream);
|
||||
return pOperator;
|
||||
}
|
||||
|
||||
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo));
|
||||
pInfo->colIndex = -1;
|
||||
|
@ -6285,7 +6614,6 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
|
|||
|
||||
appendUpstream(pOperator, upstream);
|
||||
return pOperator;
|
||||
|
||||
}
|
||||
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo));
|
||||
|
@ -6337,6 +6665,32 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
|
||||
|
||||
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
|
||||
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
|
||||
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
|
||||
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
pOperator->name = "AllMultiTableTimeIntervalOperator";
|
||||
pOperator->operatorType = OP_AllMultiTableTimeInterval;
|
||||
pOperator->blockingOptr = true;
|
||||
pOperator->status = OP_IN_EXECUTING;
|
||||
pOperator->pExpr = pExpr;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||
|
||||
pOperator->exec = doAllSTableIntervalAgg;
|
||||
pOperator->cleanup = destroyBasicOperatorInfo;
|
||||
|
||||
appendUpstream(pOperator, upstream);
|
||||
|
||||
return pOperator;
|
||||
}
|
||||
|
||||
|
||||
SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo));
|
||||
pInfo->colIndex = -1; // group by column index
|
||||
|
|
|
@ -567,10 +567,18 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
|
|||
}
|
||||
} else if (pQueryAttr->interval.interval > 0) {
|
||||
if (pQueryAttr->stableQuery) {
|
||||
op = OP_MultiTableTimeInterval;
|
||||
if (pQueryAttr->pointInterpQuery) {
|
||||
op = OP_AllMultiTableTimeInterval;
|
||||
} else {
|
||||
op = OP_MultiTableTimeInterval;
|
||||
}
|
||||
taosArrayPush(plan, &op);
|
||||
} else {
|
||||
op = OP_TimeWindow;
|
||||
} else {
|
||||
if (pQueryAttr->pointInterpQuery) {
|
||||
op = OP_AllTimeWindow;
|
||||
} else {
|
||||
op = OP_TimeWindow;
|
||||
}
|
||||
taosArrayPush(plan, &op);
|
||||
|
||||
if (pQueryAttr->pExpr2 != NULL) {
|
||||
|
@ -578,7 +586,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
|
|||
taosArrayPush(plan, &op);
|
||||
}
|
||||
|
||||
if (pQueryAttr->fillType != TSDB_FILL_NONE && (!pQueryAttr->pointInterpQuery)) {
|
||||
if (pQueryAttr->fillType != TSDB_FILL_NONE) {
|
||||
op = OP_Fill;
|
||||
taosArrayPush(plan, &op);
|
||||
}
|
||||
|
|
|
@ -24,8 +24,7 @@ typedef struct STable {
|
|||
tstr* name; // NOTE: there a flexible string here
|
||||
uint64_t suid;
|
||||
struct STable* pSuper; // super table pointer
|
||||
uint8_t numOfSchemas;
|
||||
STSchema* schema[TSDB_MAX_TABLE_SCHEMAS];
|
||||
SArray* schema;
|
||||
STSchema* tagSchema;
|
||||
SKVRow tagVal;
|
||||
SSkipList* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
|
||||
|
@ -107,10 +106,9 @@ static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock,
|
|||
|
||||
if (lock) TSDB_RLOCK_TABLE(pDTable);
|
||||
if (_version < 0) { // get the latest version of schema
|
||||
pTSchema = pDTable->schema[pDTable->numOfSchemas - 1];
|
||||
pTSchema = *(STSchema **)taosArrayGetLast(pDTable->schema);
|
||||
} else { // get the schema with version
|
||||
void* ptr = taosbsearch(&_version, pDTable->schema, pDTable->numOfSchemas, sizeof(STSchema*),
|
||||
tsdbCompareSchemaVersion, TD_EQ);
|
||||
void* ptr = taosArraySearch(pDTable->schema, &_version, tsdbCompareSchemaVersion, TD_EQ);
|
||||
if (ptr == NULL) {
|
||||
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
||||
goto _exit;
|
||||
|
|
|
@ -44,6 +44,8 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable);
|
|||
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable);
|
||||
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid);
|
||||
static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema);
|
||||
static int tsdbAddSchema(STable *pTable, STSchema *pSchema);
|
||||
static void tsdbFreeTableSchema(STable *pTable);
|
||||
|
||||
// ------------------ OUTER FUNCTIONS ------------------
|
||||
int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) {
|
||||
|
@ -723,17 +725,10 @@ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema,
|
|||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
||||
STable *pCTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable;
|
||||
ASSERT(schemaVersion(pSchema) > schemaVersion(pCTable->schema[pCTable->numOfSchemas - 1]));
|
||||
ASSERT(schemaVersion(pSchema) > schemaVersion(*(STSchema **)taosArrayGetLast(pCTable->schema)));
|
||||
|
||||
TSDB_WLOCK_TABLE(pCTable);
|
||||
if (pCTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) {
|
||||
pCTable->schema[pCTable->numOfSchemas++] = pSchema;
|
||||
} else {
|
||||
ASSERT(pCTable->numOfSchemas == TSDB_MAX_TABLE_SCHEMAS);
|
||||
tdFreeSchema(pCTable->schema[0]);
|
||||
memmove(pCTable->schema, pCTable->schema + 1, sizeof(STSchema *) * (TSDB_MAX_TABLE_SCHEMAS - 1));
|
||||
pCTable->schema[pCTable->numOfSchemas - 1] = pSchema;
|
||||
}
|
||||
tsdbAddSchema(pCTable, pSchema);
|
||||
|
||||
if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema);
|
||||
if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema);
|
||||
|
@ -829,9 +824,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper, STable *pST
|
|||
TABLE_TID(pTable) = -1;
|
||||
TABLE_SUID(pTable) = -1;
|
||||
pTable->pSuper = NULL;
|
||||
pTable->numOfSchemas = 1;
|
||||
pTable->schema[0] = tdDupSchema(pCfg->schema);
|
||||
if (pTable->schema[0] == NULL) {
|
||||
if (tsdbAddSchema(pTable, tdDupSchema(pCfg->schema)) < 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
@ -842,7 +835,8 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper, STable *pST
|
|||
}
|
||||
pTable->tagVal = NULL;
|
||||
STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN);
|
||||
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL, SL_ALLOW_DUP_KEY, getTagIndexKey);
|
||||
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL,
|
||||
SL_ALLOW_DUP_KEY, getTagIndexKey);
|
||||
if (pTable->pIndex == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
|
@ -871,9 +865,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper, STable *pST
|
|||
}
|
||||
} else {
|
||||
TABLE_SUID(pTable) = -1;
|
||||
pTable->numOfSchemas = 1;
|
||||
pTable->schema[0] = tdDupSchema(pCfg->schema);
|
||||
if (pTable->schema[0] == NULL) {
|
||||
if (tsdbAddSchema(pTable, tdDupSchema(pCfg->schema)) < 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
@ -907,9 +899,7 @@ static void tsdbFreeTable(STable *pTable) {
|
|||
TABLE_UID(pTable));
|
||||
tfree(TABLE_NAME(pTable));
|
||||
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
|
||||
for (int i = 0; i < TSDB_MAX_TABLE_SCHEMAS; i++) {
|
||||
tdFreeSchema(pTable->schema[i]);
|
||||
}
|
||||
tsdbFreeTableSchema(pTable);
|
||||
|
||||
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
||||
tdFreeSchema(pTable->tagSchema);
|
||||
|
@ -1261,9 +1251,10 @@ static int tsdbEncodeTable(void **buf, STable *pTable) {
|
|||
tlen += taosEncodeFixedU64(buf, TABLE_SUID(pTable));
|
||||
tlen += tdEncodeKVRow(buf, pTable->tagVal);
|
||||
} else {
|
||||
tlen += taosEncodeFixedU8(buf, pTable->numOfSchemas);
|
||||
for (int i = 0; i < pTable->numOfSchemas; i++) {
|
||||
tlen += tdEncodeSchema(buf, pTable->schema[i]);
|
||||
tlen += taosEncodeFixedU8(buf, (uint8_t)taosArrayGetSize(pTable->schema));
|
||||
for (int i = 0; i < taosArrayGetSize(pTable->schema); i++) {
|
||||
STSchema *pSchema = taosArrayGetP(pTable->schema, i);
|
||||
tlen += tdEncodeSchema(buf, pSchema);
|
||||
}
|
||||
|
||||
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
||||
|
@ -1294,9 +1285,12 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
|
|||
buf = taosDecodeFixedU64(buf, &TABLE_SUID(pTable));
|
||||
buf = tdDecodeKVRow(buf, &(pTable->tagVal));
|
||||
} else {
|
||||
buf = taosDecodeFixedU8(buf, &(pTable->numOfSchemas));
|
||||
for (int i = 0; i < pTable->numOfSchemas; i++) {
|
||||
buf = tdDecodeSchema(buf, &(pTable->schema[i]));
|
||||
uint8_t nSchemas;
|
||||
buf = taosDecodeFixedU8(buf, &nSchemas);
|
||||
for (int i = 0; i < nSchemas; i++) {
|
||||
STSchema *pSchema;
|
||||
buf = tdDecodeSchema(buf, &pSchema);
|
||||
tsdbAddSchema(pTable, pSchema);
|
||||
}
|
||||
|
||||
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
||||
|
@ -1458,3 +1452,38 @@ static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema) {
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbAddSchema(STable *pTable, STSchema *pSchema) {
|
||||
ASSERT(TABLE_TYPE(pTable) != TSDB_CHILD_TABLE);
|
||||
|
||||
if (pTable->schema == NULL) {
|
||||
pTable->schema = taosArrayInit(TSDB_MAX_TABLE_SCHEMAS, sizeof(SSchema *));
|
||||
if (pTable->schema == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(taosArrayGetSize(pTable->schema) == 0 ||
|
||||
schemaVersion(pSchema) > schemaVersion(*(STSchema **)taosArrayGetLast(pTable->schema)));
|
||||
|
||||
if (taosArrayPush(pTable->schema, &pSchema) == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void tsdbFreeTableSchema(STable *pTable) {
|
||||
ASSERT(pTable != NULL);
|
||||
|
||||
if (pTable->schema) {
|
||||
for (size_t i = 0; i < taosArrayGetSize(pTable->schema); i++) {
|
||||
STSchema *pSchema = taosArrayGetP(pTable->schema, i);
|
||||
tdFreeSchema(pSchema);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTable->schema);
|
||||
}
|
||||
}
|
|
@ -2693,7 +2693,7 @@ static void destroyHelper(void* param) {
|
|||
free(param);
|
||||
}
|
||||
|
||||
static bool loadBlockOfActiveTable(STsdbQueryHandle* pQueryHandle) {
|
||||
static bool loadBlockOfActiveTable(STsdbQueryHandle* pQueryHandle) {
|
||||
if (pQueryHandle->checkFiles) {
|
||||
// check if the query range overlaps with the file data block
|
||||
bool exists = true;
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import random
|
||||
import string
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
from util.dnodes import *
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
def genColList(self):
|
||||
'''
|
||||
generate column list
|
||||
'''
|
||||
col_list = list()
|
||||
for i in range(1, 18):
|
||||
col_list.append(f'c{i}')
|
||||
return col_list
|
||||
|
||||
def genIncreaseValue(self, input_value):
|
||||
'''
|
||||
add ', 1' to end of value every loop
|
||||
'''
|
||||
value_list = list(input_value)
|
||||
value_list.insert(-1, ", 1")
|
||||
return ''.join(value_list)
|
||||
|
||||
def insertAlter(self):
|
||||
'''
|
||||
after each alter and insert, when execute 'select * from {tbname};' taosd will coredump
|
||||
'''
|
||||
tbname = ''.join(random.choice(string.ascii_letters.lower()) for i in range(7))
|
||||
input_value = '(now, 1)'
|
||||
tdSql.execute(f'create table {tbname} (ts timestamp, c0 int);')
|
||||
tdSql.execute(f'insert into {tbname} values {input_value};')
|
||||
for col in self.genColList():
|
||||
input_value = self.genIncreaseValue(input_value)
|
||||
tdSql.execute(f'alter table {tbname} add column {col} int;')
|
||||
tdSql.execute(f'insert into {tbname} values {input_value};')
|
||||
tdSql.query(f'select * from {tbname};')
|
||||
tdSql.checkRows(18)
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
self.insertAlter()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -381,6 +381,7 @@ python3 ./test.py -f query/querySession.py
|
|||
python3 test.py -f alter/alter_create_exception.py
|
||||
python3 ./test.py -f insert/flushwhiledrop.py
|
||||
python3 ./test.py -f insert/schemalessInsert.py
|
||||
python3 ./test.py -f alter/alterColMultiTimes.py
|
||||
|
||||
#======================p4-end===============
|
||||
|
||||
|
|
|
@ -47,7 +47,6 @@ class TDTestCase:
|
|||
else:
|
||||
tdLog.info("taosd found in %s" % buildPath)
|
||||
binPath = buildPath + "/build/bin/"
|
||||
|
||||
# insert: create one or mutiple tables per sql and insert multiple rows per sql
|
||||
# insert data from a special timestamp
|
||||
# check stable stb0
|
||||
|
@ -90,7 +89,6 @@ class TDTestCase:
|
|||
os.system(
|
||||
"%staosdemo -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestNanoDatabaseNow.json -y " %
|
||||
binPath)
|
||||
|
||||
tdSql.execute("use nsdb2")
|
||||
tdSql.query("show stables")
|
||||
tdSql.checkData(0, 4, 100)
|
||||
|
|
|
@ -103,7 +103,6 @@ class TDTestCase:
|
|||
|
||||
os.system("cat subscribe_res0.txt* > all_subscribe_res0.txt")
|
||||
subTimes0 = self.subTimes("all_subscribe_res0.txt")
|
||||
print("pass")
|
||||
self.assertCheck("all_subscribe_res0.txt",subTimes0 ,202)
|
||||
|
||||
|
||||
|
|
|
@ -55,6 +55,9 @@ while $i < $halfNum
|
|||
endw
|
||||
print ====== tables created
|
||||
|
||||
sql create table ap1 (ts timestamp, pav float);
|
||||
sql INSERT INTO ap1 VALUES ('2021-07-25 02:19:54.100',1) ('2021-07-25 02:19:54.200',2) ('2021-07-25 02:19:54.300',3) ('2021-07-25 02:19:56.500',4) ('2021-07-25 02:19:57.500',5) ('2021-07-25 02:19:57.600',6) ('2021-07-25 02:19:57.900',7) ('2021-07-25 02:19:58.100',8) ('2021-07-25 02:19:58.300',9) ('2021-07-25 02:19:59.100',10) ('2021-07-25 02:19:59.300',11) ('2021-07-25 02:19:59.500',12) ('2021-07-25 02:19:59.700',13) ('2021-07-25 02:19:59.900',14) ('2021-07-25 02:20:05.000', 20) ('2021-07-25 02:25:00.000', 10000);
|
||||
|
||||
run general/parser/interp_test.sim
|
||||
|
||||
print ================== restart server to commit data into disk
|
||||
|
@ -65,4 +68,4 @@ print ================== server restart completed
|
|||
|
||||
run general/parser/interp_test.sim
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue