Merge branch 'develop' into feature/tagschema
This commit is contained in:
commit
8edc36edc5
|
@ -90,6 +90,7 @@ matrix:
|
||||||
esac
|
esac
|
||||||
|
|
||||||
- os: linux
|
- os: linux
|
||||||
|
dist: bionic
|
||||||
language: c
|
language: c
|
||||||
compiler: gcc
|
compiler: gcc
|
||||||
env: COVERITY_SCAN=true
|
env: COVERITY_SCAN=true
|
||||||
|
@ -125,6 +126,7 @@ matrix:
|
||||||
branch_pattern: coverity_scan
|
branch_pattern: coverity_scan
|
||||||
|
|
||||||
- os: linux
|
- os: linux
|
||||||
|
dist: bionic
|
||||||
language: c
|
language: c
|
||||||
compiler: gcc
|
compiler: gcc
|
||||||
env: ENV_COVER=true
|
env: ENV_COVER=true
|
||||||
|
@ -230,6 +232,7 @@ matrix:
|
||||||
- make > /dev/null
|
- make > /dev/null
|
||||||
|
|
||||||
- os: linux
|
- os: linux
|
||||||
|
dist: bionic
|
||||||
language: c
|
language: c
|
||||||
compiler: clang
|
compiler: clang
|
||||||
env: DESC="linux/clang build"
|
env: DESC="linux/clang build"
|
||||||
|
|
|
@ -42,35 +42,35 @@ enum {
|
||||||
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
|
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
|
||||||
|
|
||||||
static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) {
|
static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) {
|
||||||
int32_t numType = isValidNumber(pToken);
|
// int32_t numType = isValidNumber(pToken);
|
||||||
if (TK_ILLEGAL == numType) {
|
// if (TK_ILLEGAL == numType) {
|
||||||
return numType;
|
// return numType;
|
||||||
}
|
// }
|
||||||
|
|
||||||
int32_t radix = 10;
|
int32_t radix = 10;
|
||||||
if (numType == TK_HEX) {
|
if (pToken->type == TK_HEX) {
|
||||||
radix = 16;
|
radix = 16;
|
||||||
} else if (numType == TK_OCT) {
|
} else if (pToken->type == TK_OCT) {
|
||||||
radix = 8;
|
radix = 8;
|
||||||
} else if (numType == TK_BIN) {
|
} else if (pToken->type == TK_BIN) {
|
||||||
radix = 2;
|
radix = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
errno = 0;
|
errno = 0;
|
||||||
*value = strtoll(pToken->z, endPtr, radix);
|
*value = strtoll(pToken->z, endPtr, radix);
|
||||||
|
|
||||||
return numType;
|
return pToken->type;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tscToDouble(SSQLToken *pToken, double *value, char **endPtr) {
|
static int32_t tscToDouble(SSQLToken *pToken, double *value, char **endPtr) {
|
||||||
int32_t numType = isValidNumber(pToken);
|
// int32_t numType = isValidNumber(pToken);
|
||||||
if (TK_ILLEGAL == numType) {
|
// if (TK_ILLEGAL == numType) {
|
||||||
return numType;
|
// return numType;
|
||||||
}
|
// }
|
||||||
|
|
||||||
errno = 0;
|
errno = 0;
|
||||||
*value = strtod(pToken->z, endPtr);
|
*value = strtod(pToken->z, endPtr);
|
||||||
return numType;
|
return pToken->type;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsParseTime(SSQLToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec) {
|
int tsParseTime(SSQLToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec) {
|
||||||
|
|
|
@ -23,8 +23,6 @@
|
||||||
void tscSaveSlowQueryFp(void *handle, void *tmrId);
|
void tscSaveSlowQueryFp(void *handle, void *tmrId);
|
||||||
void *tscSlowQueryConn = NULL;
|
void *tscSlowQueryConn = NULL;
|
||||||
bool tscSlowQueryConnInitialized = false;
|
bool tscSlowQueryConnInitialized = false;
|
||||||
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
|
|
||||||
void *param, void **taos);
|
|
||||||
|
|
||||||
void tscInitConnCb(void *param, TAOS_RES *result, int code) {
|
void tscInitConnCb(void *param, TAOS_RES *result, int code) {
|
||||||
char *sql = param;
|
char *sql = param;
|
||||||
|
|
|
@ -808,18 +808,19 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource *
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo,
|
void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) {
|
||||||
SFillInfo *pFillInfo) {
|
|
||||||
// discard following dataset in the same group and reset the interpolation information
|
// discard following dataset in the same group and reset the interpolation information
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||||
|
|
||||||
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||||
|
|
||||||
int16_t prec = tinfo.precision;
|
if (pFillInfo != NULL) {
|
||||||
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
|
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
|
||||||
int64_t revisedSTime =
|
int64_t revisedSTime =
|
||||||
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec);
|
taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
|
||||||
|
|
||||||
taosResetFillInfo(pFillInfo, revisedSTime);
|
taosResetFillInfo(pFillInfo, revisedSTime);
|
||||||
|
}
|
||||||
|
|
||||||
pLocalReducer->discard = true;
|
pLocalReducer->discard = true;
|
||||||
pLocalReducer->discardData->num = 0;
|
pLocalReducer->discardData->num = 0;
|
||||||
|
@ -915,13 +916,12 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
|
||||||
if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) {
|
if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) {
|
||||||
/* impose the limitation of output rows on the final result */
|
/* impose the limitation of output rows on the final result */
|
||||||
int32_t prevSize = pFinalDataPage->num;
|
int32_t prevSize = pFinalDataPage->num;
|
||||||
int32_t overFlow = pRes->numOfClauseTotal - pQueryInfo->limit.limit;
|
int32_t overflow = pRes->numOfClauseTotal - pQueryInfo->limit.limit;
|
||||||
|
assert(overflow < pRes->numOfRows);
|
||||||
assert(overFlow < pRes->numOfRows);
|
|
||||||
|
|
||||||
pRes->numOfClauseTotal = pQueryInfo->limit.limit;
|
pRes->numOfClauseTotal = pQueryInfo->limit.limit;
|
||||||
pRes->numOfRows -= overFlow;
|
pRes->numOfRows -= overflow;
|
||||||
pFinalDataPage->num -= overFlow;
|
pFinalDataPage->num -= overflow;
|
||||||
|
|
||||||
tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize);
|
tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize);
|
||||||
|
|
||||||
|
@ -988,13 +988,13 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
|
||||||
|
|
||||||
if (pRes->numOfRows > 0) {
|
if (pRes->numOfRows > 0) {
|
||||||
if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) {
|
if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) {
|
||||||
int32_t overFlow = pRes->numOfClauseTotal - pQueryInfo->limit.limit;
|
int32_t overflow = pRes->numOfClauseTotal - pQueryInfo->limit.limit;
|
||||||
pRes->numOfRows -= overFlow;
|
pRes->numOfRows -= overflow;
|
||||||
|
|
||||||
assert(pRes->numOfRows >= 0);
|
assert(pRes->numOfRows >= 0);
|
||||||
|
|
||||||
pRes->numOfClauseTotal = pQueryInfo->limit.limit;
|
pRes->numOfClauseTotal = pQueryInfo->limit.limit;
|
||||||
pFinalDataPage->num -= overFlow;
|
pFinalDataPage->num -= overflow;
|
||||||
|
|
||||||
/* set remain data to be discarded, and reset the interpolation information */
|
/* set remain data to be discarded, and reset the interpolation information */
|
||||||
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo);
|
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo);
|
||||||
|
|
|
@ -106,9 +106,28 @@ typedef void *SDataRow;
|
||||||
SDataRow tdNewDataRowFromSchema(STSchema *pSchema);
|
SDataRow tdNewDataRowFromSchema(STSchema *pSchema);
|
||||||
void tdFreeDataRow(SDataRow row);
|
void tdFreeDataRow(SDataRow row);
|
||||||
void tdInitDataRow(SDataRow row, STSchema *pSchema);
|
void tdInitDataRow(SDataRow row, STSchema *pSchema);
|
||||||
int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset);
|
|
||||||
SDataRow tdDataRowDup(SDataRow row);
|
SDataRow tdDataRowDup(SDataRow row);
|
||||||
|
|
||||||
|
static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) {
|
||||||
|
ASSERT(value != NULL);
|
||||||
|
int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE;
|
||||||
|
char * ptr = (char *)POINTER_SHIFT(row, dataRowLen(row));
|
||||||
|
|
||||||
|
switch (type) {
|
||||||
|
case TSDB_DATA_TYPE_BINARY:
|
||||||
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
|
*(VarDataOffsetT *)POINTER_SHIFT(row, toffset) = dataRowLen(row);
|
||||||
|
memcpy(ptr, value, varDataTLen(value));
|
||||||
|
dataRowLen(row) += varDataTLen(value);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
memcpy(POINTER_SHIFT(row, toffset), value, TYPE_BYTES[type]);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// NOTE: offset here including the header size
|
// NOTE: offset here including the header size
|
||||||
static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) {
|
static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
|
|
|
@ -290,32 +290,6 @@ void tdFreeDataRow(SDataRow row) {
|
||||||
if (row) free(row);
|
if (row) free(row);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Append a column value to the data row
|
|
||||||
* @param type: column type
|
|
||||||
* @param bytes: column bytes
|
|
||||||
* @param offset: offset in the data row tuple, not including the data row header
|
|
||||||
*/
|
|
||||||
int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) {
|
|
||||||
ASSERT(value != NULL);
|
|
||||||
int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE;
|
|
||||||
char * ptr = POINTER_SHIFT(row, dataRowLen(row));
|
|
||||||
|
|
||||||
switch (type) {
|
|
||||||
case TSDB_DATA_TYPE_BINARY:
|
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
|
||||||
*(VarDataOffsetT *)POINTER_SHIFT(row, toffset) = dataRowLen(row);
|
|
||||||
memcpy(ptr, value, varDataTLen(value));
|
|
||||||
dataRowLen(row) += varDataTLen(value);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
memcpy(POINTER_SHIFT(row, toffset), value, TYPE_BYTES[type]);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
SDataRow tdDataRowDup(SDataRow row) {
|
SDataRow tdDataRowDup(SDataRow row) {
|
||||||
SDataRow trow = malloc(dataRowLen(row));
|
SDataRow trow = malloc(dataRowLen(row));
|
||||||
if (trow == NULL) return NULL;
|
if (trow == NULL) return NULL;
|
||||||
|
|
|
@ -141,7 +141,7 @@ int32_t rpcDebugFlag = 135;
|
||||||
int32_t uDebugFlag = 131;
|
int32_t uDebugFlag = 131;
|
||||||
int32_t debugFlag = 131;
|
int32_t debugFlag = 131;
|
||||||
int32_t sDebugFlag = 135;
|
int32_t sDebugFlag = 135;
|
||||||
int32_t tsdbDebugFlag = 135;
|
int32_t tsdbDebugFlag = 131;
|
||||||
|
|
||||||
// the maximum number of results for projection query on super table that are returned from
|
// the maximum number of results for projection query on super table that are returned from
|
||||||
// one virtual node, to order according to timestamp
|
// one virtual node, to order according to timestamp
|
||||||
|
|
|
@ -40,6 +40,7 @@ typedef struct {
|
||||||
int num; // number of continuous streams
|
int num; // number of continuous streams
|
||||||
struct SCqObj *pHead;
|
struct SCqObj *pHead;
|
||||||
void *dbConn;
|
void *dbConn;
|
||||||
|
int master;
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
} SCqContext;
|
} SCqContext;
|
||||||
|
|
||||||
|
@ -58,6 +59,7 @@ typedef struct SCqObj {
|
||||||
int cqDebugFlag = 135;
|
int cqDebugFlag = 135;
|
||||||
|
|
||||||
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
|
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
|
||||||
|
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
|
||||||
|
|
||||||
void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
|
void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
|
||||||
|
|
||||||
|
@ -69,6 +71,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
|
||||||
pContext->vgId = pCfg->vgId;
|
pContext->vgId = pCfg->vgId;
|
||||||
pContext->cqWrite = pCfg->cqWrite;
|
pContext->cqWrite = pCfg->cqWrite;
|
||||||
pContext->ahandle = ahandle;
|
pContext->ahandle = ahandle;
|
||||||
|
tscEmbedded = 1;
|
||||||
|
|
||||||
pthread_mutex_init(&pContext->mutex, NULL);
|
pthread_mutex_init(&pContext->mutex, NULL);
|
||||||
|
|
||||||
|
@ -84,6 +87,8 @@ void cqClose(void *handle) {
|
||||||
cqStop(pContext);
|
cqStop(pContext);
|
||||||
|
|
||||||
// free all resources
|
// free all resources
|
||||||
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
|
|
||||||
SCqObj *pObj = pContext->pHead;
|
SCqObj *pObj = pContext->pHead;
|
||||||
while (pObj) {
|
while (pObj) {
|
||||||
SCqObj *pTemp = pObj;
|
SCqObj *pTemp = pObj;
|
||||||
|
@ -91,6 +96,8 @@ void cqClose(void *handle) {
|
||||||
free(pTemp);
|
free(pTemp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&pContext->mutex);
|
||||||
|
|
||||||
pthread_mutex_destroy(&pContext->mutex);
|
pthread_mutex_destroy(&pContext->mutex);
|
||||||
|
|
||||||
cTrace("vgId:%d, CQ is closed", pContext->vgId);
|
cTrace("vgId:%d, CQ is closed", pContext->vgId);
|
||||||
|
@ -100,28 +107,15 @@ void cqClose(void *handle) {
|
||||||
void cqStart(void *handle) {
|
void cqStart(void *handle) {
|
||||||
SCqContext *pContext = handle;
|
SCqContext *pContext = handle;
|
||||||
cTrace("vgId:%d, start all CQs", pContext->vgId);
|
cTrace("vgId:%d, start all CQs", pContext->vgId);
|
||||||
if (pContext->dbConn) return;
|
if (pContext->dbConn || pContext->master) return;
|
||||||
|
|
||||||
pthread_mutex_lock(&pContext->mutex);
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
|
|
||||||
tscEmbedded = 1;
|
pContext->master = 1;
|
||||||
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0);
|
|
||||||
if (pContext->dbConn == NULL) {
|
|
||||||
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
|
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SCqObj *pObj = pContext->pHead;
|
SCqObj *pObj = pContext->pHead;
|
||||||
while (pObj) {
|
while (pObj) {
|
||||||
int64_t lastKey = 0;
|
cqCreateStream(pContext, pObj);
|
||||||
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
|
|
||||||
if (pObj->pStream) {
|
|
||||||
pContext->num++;
|
|
||||||
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
|
|
||||||
} else {
|
|
||||||
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
|
|
||||||
}
|
|
||||||
pObj = pObj->next;
|
pObj = pObj->next;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,10 +125,11 @@ void cqStart(void *handle) {
|
||||||
void cqStop(void *handle) {
|
void cqStop(void *handle) {
|
||||||
SCqContext *pContext = handle;
|
SCqContext *pContext = handle;
|
||||||
cTrace("vgId:%d, stop all CQs", pContext->vgId);
|
cTrace("vgId:%d, stop all CQs", pContext->vgId);
|
||||||
if (pContext->dbConn == NULL) return;
|
if (pContext->dbConn == NULL || pContext->master == 0) return;
|
||||||
|
|
||||||
pthread_mutex_lock(&pContext->mutex);
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
|
|
||||||
|
pContext->master = 0;
|
||||||
SCqObj *pObj = pContext->pHead;
|
SCqObj *pObj = pContext->pHead;
|
||||||
while (pObj) {
|
while (pObj) {
|
||||||
if (pObj->pStream) {
|
if (pObj->pStream) {
|
||||||
|
@ -176,16 +171,7 @@ void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int column
|
||||||
if (pContext->pHead) pContext->pHead->prev = pObj;
|
if (pContext->pHead) pContext->pHead->prev = pObj;
|
||||||
pContext->pHead = pObj;
|
pContext->pHead = pObj;
|
||||||
|
|
||||||
if (pContext->dbConn) {
|
cqCreateStream(pContext, pObj);
|
||||||
int64_t lastKey = 0;
|
|
||||||
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
|
|
||||||
if (pObj->pStream) {
|
|
||||||
pContext->num++;
|
|
||||||
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
|
|
||||||
} else {
|
|
||||||
cError("vgId:%d, id:%d CQ:%s, failed to launch", pContext->vgId, pObj->tid, pObj->sqlStr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
pthread_mutex_unlock(&pContext->mutex);
|
||||||
|
|
||||||
|
@ -218,6 +204,26 @@ void cqDrop(void *handle) {
|
||||||
pthread_mutex_lock(&pContext->mutex);
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
||||||
|
|
||||||
|
if (pContext->dbConn == NULL) {
|
||||||
|
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0);
|
||||||
|
if (pContext->dbConn == NULL) {
|
||||||
|
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t lastKey = 0;
|
||||||
|
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
|
||||||
|
if (pObj->pStream) {
|
||||||
|
pContext->num++;
|
||||||
|
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||||
|
} else {
|
||||||
|
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
|
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
|
||||||
SCqObj *pObj = (SCqObj *)param;
|
SCqObj *pObj = (SCqObj *)param;
|
||||||
SCqContext *pContext = pObj->pContext;
|
SCqContext *pContext = pObj->pContext;
|
||||||
|
|
|
@ -97,7 +97,9 @@ typedef struct {
|
||||||
STSCursor cur;
|
STSCursor cur;
|
||||||
} SQueryStatusInfo;
|
} SQueryStatusInfo;
|
||||||
|
|
||||||
|
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
|
||||||
static void setQueryStatus(SQuery *pQuery, int8_t status);
|
static void setQueryStatus(SQuery *pQuery, int8_t status);
|
||||||
|
|
||||||
static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
|
static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
|
||||||
|
|
||||||
// todo move to utility
|
// todo move to utility
|
||||||
|
@ -278,6 +280,26 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
return maxOutput;
|
return maxOutput;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* the value of number of result needs to be update due to offset value upated.
|
||||||
|
*/
|
||||||
|
void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRes) {
|
||||||
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
||||||
|
SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]);
|
||||||
|
|
||||||
|
int16_t functionId = pRuntimeEnv->pCtx[j].functionId;
|
||||||
|
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ ||
|
||||||
|
functionId == TSDB_FUNC_TS_DUMMY) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(pResInfo->numOfRes > numOfRes);
|
||||||
|
pResInfo->numOfRes = numOfRes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t getGroupResultId(int32_t groupIndex) {
|
static int32_t getGroupResultId(int32_t groupIndex) {
|
||||||
int32_t base = 200000;
|
int32_t base = 200000;
|
||||||
return base + (groupIndex * 10000);
|
return base + (groupIndex * 10000);
|
||||||
|
@ -354,9 +376,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) {
|
||||||
|
|
||||||
bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].base.functionId == TSDB_FUNC_TS_COMP; }
|
bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].base.functionId == TSDB_FUNC_TS_COMP; }
|
||||||
|
|
||||||
static bool limitResults(SQInfo *pQInfo) {
|
static bool limitResults(SQuery *pQuery) {
|
||||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
|
||||||
|
|
||||||
if ((pQuery->limit.limit > 0) && (pQuery->rec.total + pQuery->rec.rows > pQuery->limit.limit)) {
|
if ((pQuery->limit.limit > 0) && (pQuery->rec.total + pQuery->rec.rows > pQuery->limit.limit)) {
|
||||||
pQuery->rec.rows = pQuery->limit.limit - pQuery->rec.total;
|
pQuery->rec.rows = pQuery->limit.limit - pQuery->rec.total;
|
||||||
assert(pQuery->rec.rows > 0);
|
assert(pQuery->rec.rows > 0);
|
||||||
|
@ -626,6 +646,7 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
int64_t skey = TSKEY_INITIAL_VAL;
|
int64_t skey = TSKEY_INITIAL_VAL;
|
||||||
|
|
||||||
|
// TODO opt performance: get the closed time window here
|
||||||
for (i = 0; i < pWindowResInfo->size; ++i) {
|
for (i = 0; i < pWindowResInfo->size; ++i) {
|
||||||
SWindowResult *pResult = &pWindowResInfo->pResult[i];
|
SWindowResult *pResult = &pWindowResInfo->pResult[i];
|
||||||
if (pResult->status.closed) {
|
if (pResult->status.closed) {
|
||||||
|
@ -1303,6 +1324,10 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
|
||||||
if (numOfRes >= pQuery->rec.threshold) {
|
if (numOfRes >= pQuery->rec.threshold) {
|
||||||
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
|
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((pQuery->limit.limit >= 0) && numOfRes >= (pQuery->limit.limit + pQuery->limit.offset)) {
|
||||||
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return numOfRes;
|
return numOfRes;
|
||||||
|
@ -2408,6 +2433,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
|
|
||||||
char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(tFilePage));
|
char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(tFilePage));
|
||||||
if (tmp == NULL) { // todo handle the oom
|
if (tmp == NULL) { // todo handle the oom
|
||||||
|
assert(0);
|
||||||
} else {
|
} else {
|
||||||
pQuery->sdata[i] = (tFilePage *)tmp;
|
pQuery->sdata[i] = (tFilePage *)tmp;
|
||||||
}
|
}
|
||||||
|
@ -2421,7 +2447,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
qTrace("QInfo: %p realloc output buffer, new size: %d rows, old:%d, remain:%d", GET_QINFO_ADDR(pRuntimeEnv),
|
qTrace("QInfo:%p realloc output buffer, new size: %d rows, old:%d, remain:%d", GET_QINFO_ADDR(pRuntimeEnv),
|
||||||
newSize, pRec->capacity, newSize - pRec->rows);
|
newSize, pRec->capacity, newSize - pRec->rows);
|
||||||
|
|
||||||
pRec->capacity = newSize;
|
pRec->capacity = newSize;
|
||||||
|
@ -2434,11 +2460,11 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1;
|
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1;
|
||||||
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
|
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
|
||||||
|
|
||||||
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv),
|
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d", GET_QINFO_ADDR(pRuntimeEnv),
|
||||||
blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes);
|
blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes);
|
||||||
|
|
||||||
// save last access position
|
// while the output buffer is full or limit/offset is applied, query may be paused here
|
||||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
|
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL | QUERY_COMPLETED)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3004,11 +3030,13 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
|
||||||
|
|
||||||
// order has change already!
|
// order has change already!
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
if (!QUERY_IS_ASC_QUERY(pQuery)) {
|
|
||||||
assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step);
|
// TODO validate the assertion
|
||||||
} else {
|
// if (!QUERY_IS_ASC_QUERY(pQuery)) {
|
||||||
assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step);
|
// assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step);
|
||||||
}
|
// } else {
|
||||||
|
// assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step);
|
||||||
|
// }
|
||||||
|
|
||||||
pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step;
|
pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step;
|
||||||
|
|
||||||
|
@ -3087,7 +3115,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
|
||||||
void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
|
void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
SWITCH_ORDER(pRuntimeEnv->pCtx[i] .order);
|
SWITCH_ORDER(pRuntimeEnv->pCtx[i].order);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3173,30 +3201,38 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pQuery->rec.rows <= pQuery->limit.offset) {
|
if (pQuery->rec.rows <= pQuery->limit.offset) {
|
||||||
|
qTrace("QInfo:%p skip rows:%d, new offset:%" PRIu64, GET_QINFO_ADDR(pRuntimeEnv), pQuery->rec.rows,
|
||||||
|
pQuery->limit.offset - pQuery->rec.rows);
|
||||||
|
|
||||||
pQuery->limit.offset -= pQuery->rec.rows;
|
pQuery->limit.offset -= pQuery->rec.rows;
|
||||||
pQuery->rec.rows = 0;
|
pQuery->rec.rows = 0;
|
||||||
|
|
||||||
resetCtxOutputBuf(pRuntimeEnv);
|
resetCtxOutputBuf(pRuntimeEnv);
|
||||||
|
|
||||||
// clear the buffer is full flag if exists
|
// clear the buffer full flag if exists
|
||||||
pQuery->status &= (~QUERY_RESBUF_FULL);
|
CLEAR_QUERY_STATUS(pQuery, QUERY_RESBUF_FULL);
|
||||||
} else {
|
} else {
|
||||||
int32_t numOfSkip = (int32_t) pQuery->limit.offset;
|
int64_t numOfSkip = pQuery->limit.offset;
|
||||||
pQuery->rec.rows -= numOfSkip;
|
pQuery->rec.rows -= numOfSkip;
|
||||||
|
pQuery->limit.offset = 0;
|
||||||
|
|
||||||
|
qTrace("QInfo:%p skip row:%"PRId64", new offset:%d, numOfRows remain:%" PRIu64, GET_QINFO_ADDR(pRuntimeEnv), numOfSkip,
|
||||||
|
0, pQuery->rec.rows);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
|
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
|
||||||
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
|
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
|
||||||
|
|
||||||
memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->rec.rows * bytes);
|
memmove(pQuery->sdata[i]->data, (char*) pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->rec.rows * bytes);
|
||||||
pRuntimeEnv->pCtx[i].aOutputBuf += bytes * numOfSkip;
|
pRuntimeEnv->pCtx[i].aOutputBuf = ((char*) pQuery->sdata[i]->data) + pQuery->rec.rows * bytes;
|
||||||
|
|
||||||
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
|
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
|
||||||
pRuntimeEnv->pCtx[i].ptsOutputBuf += TSDB_KEYSIZE * numOfSkip;
|
pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pQuery->limit.offset = 0;
|
|
||||||
|
updateNumOfResult(pRuntimeEnv, pQuery->rec.rows);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3205,7 +3241,7 @@ void setQueryStatus(SQuery *pQuery, int8_t status) {
|
||||||
pQuery->status = status;
|
pQuery->status = status;
|
||||||
} else {
|
} else {
|
||||||
// QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
|
// QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
|
||||||
pQuery->status &= (~QUERY_NOT_COMPLETED);
|
CLEAR_QUERY_STATUS(pQuery, QUERY_NOT_COMPLETED);
|
||||||
pQuery->status |= status;
|
pQuery->status |= status;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3957,7 +3993,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc
|
||||||
|
|
||||||
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
|
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
|
||||||
|
|
||||||
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv),
|
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d", GET_QINFO_ADDR(pRuntimeEnv),
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes);
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4075,7 +4111,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
|
||||||
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, NULL, binarySearchForKey, pDataBlock);
|
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, NULL, binarySearchForKey, pDataBlock);
|
||||||
pRuntimeEnv->windowResInfo.curIndex = index; // restore the window index
|
pRuntimeEnv->windowResInfo.curIndex = index; // restore the window index
|
||||||
|
|
||||||
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d",
|
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d",
|
||||||
GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes);
|
GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes);
|
||||||
return true;
|
return true;
|
||||||
} else { // do nothing
|
} else { // do nothing
|
||||||
|
@ -4350,10 +4386,11 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
|
||||||
|
|
||||||
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
|
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
|
||||||
|
|
||||||
TSKEY nextKey = blockInfo.window.skey;
|
|
||||||
if (!isIntervalQuery(pQuery)) {
|
if (!isIntervalQuery(pQuery)) {
|
||||||
setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, nextKey);
|
int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1;
|
||||||
|
setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, blockInfo.window.ekey + step);
|
||||||
} else { // interval query
|
} else { // interval query
|
||||||
|
TSKEY nextKey = blockInfo.window.skey;
|
||||||
setIntervalQueryRange(pQInfo, nextKey);
|
setIntervalQueryRange(pQInfo, nextKey);
|
||||||
int32_t ret = setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo);
|
int32_t ret = setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo);
|
||||||
|
|
||||||
|
@ -4532,8 +4569,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// SPointInterpoSupporter pointInterpSupporter = {0};
|
|
||||||
|
|
||||||
// TODO handle the limit offset problem
|
// TODO handle the limit offset problem
|
||||||
if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) {
|
if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) {
|
||||||
// skipBlocks(pRuntimeEnv);
|
// skipBlocks(pRuntimeEnv);
|
||||||
|
@ -4544,12 +4579,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
scanAllDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
|
scanAllDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
|
||||||
|
|
||||||
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
|
|
||||||
skipResults(pRuntimeEnv);
|
skipResults(pRuntimeEnv);
|
||||||
|
|
||||||
// the limitation of output result is reached, set the query completed
|
// the limitation of output result is reached, set the query completed
|
||||||
if (limitResults(pQInfo)) {
|
if (limitResults(pQuery)) {
|
||||||
pQInfo->tableIndex = pQInfo->groupInfo.numOfTables;
|
pQInfo->tableIndex = pQInfo->groupInfo.numOfTables;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -4578,18 +4611,15 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
} else { // forward query range
|
} else {
|
||||||
pQuery->window.skey = pQuery->current->lastKey;
|
|
||||||
|
|
||||||
// all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter
|
// all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter
|
||||||
if (pQuery->rec.rows == 0) {
|
if (pQuery->rec.rows == 0) {
|
||||||
assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
|
assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
// pQInfo->pTableQuerySupporter->pMeterSidExtInfo[k]->key = pQuery->lastKey;
|
// buffer is full, wait for the next round to retrieve data from current meter
|
||||||
// // buffer is full, wait for the next round to retrieve data from current meter
|
assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
|
||||||
// assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL));
|
break;
|
||||||
// break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4633,10 +4663,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult);
|
copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult);
|
||||||
}
|
}
|
||||||
|
|
||||||
pQuery->rec.total += pQuery->rec.rows;
|
|
||||||
|
|
||||||
qTrace(
|
qTrace(
|
||||||
"QInfo %p, numOfTables:%d, index:%d, numOfGroups:%d, %d points returned, total:%"PRId64", offset:%" PRId64,
|
"QInfo %p numOfTables:%d, index:%d, numOfGroups:%d, %d points returned, total:%"PRId64", offset:%" PRId64,
|
||||||
pQInfo, pQInfo->groupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, pQuery->rec.total,
|
pQInfo, pQInfo->groupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, pQuery->rec.total,
|
||||||
pQuery->limit.offset);
|
pQuery->limit.offset);
|
||||||
}
|
}
|
||||||
|
@ -4809,7 +4837,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
|
||||||
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
|
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
|
||||||
|
|
||||||
skipResults(pRuntimeEnv);
|
skipResults(pRuntimeEnv);
|
||||||
limitResults(pQInfo);
|
limitResults(pQuery);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
||||||
|
@ -4857,7 +4885,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
|
||||||
resetCtxOutputBuf(pRuntimeEnv);
|
resetCtxOutputBuf(pRuntimeEnv);
|
||||||
}
|
}
|
||||||
|
|
||||||
limitResults(pQInfo);
|
limitResults(pQuery);
|
||||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
|
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
|
||||||
qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo,
|
qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo,
|
||||||
pQuery->current->lastKey, pQuery->window.ekey);
|
pQuery->current->lastKey, pQuery->window.ekey);
|
||||||
|
@ -4935,7 +4963,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
||||||
|
|
||||||
// the offset is handled at prepare stage if no interpolation involved
|
// the offset is handled at prepare stage if no interpolation involved
|
||||||
if (pQuery->fillType == TSDB_FILL_NONE || pQuery->rec.rows == 0) {
|
if (pQuery->fillType == TSDB_FILL_NONE || pQuery->rec.rows == 0) {
|
||||||
limitResults(pQInfo);
|
limitResults(pQuery);
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime,
|
TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime,
|
||||||
|
@ -4947,7 +4975,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
||||||
|
|
||||||
qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows);
|
qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows);
|
||||||
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||||
limitResults(pQInfo);
|
limitResults(pQuery);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4982,7 +5010,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
|
||||||
|
|
||||||
qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows);
|
qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows);
|
||||||
if (pQuery->rec.rows > 0) {
|
if (pQuery->rec.rows > 0) {
|
||||||
limitResults(pQInfo);
|
limitResults(pQuery);
|
||||||
}
|
}
|
||||||
|
|
||||||
qTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total);
|
qTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total);
|
||||||
|
|
|
@ -282,11 +282,7 @@ int tSQLKeywordCode(const char* z, int n) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SKeyword** pKey = (SKeyword**)taosHashGet(KeywordHashTable, key, n);
|
SKeyword** pKey = (SKeyword**)taosHashGet(KeywordHashTable, key, n);
|
||||||
if (pKey != NULL) {
|
return (pKey != NULL)? (*pKey)->type:TK_ID;
|
||||||
return (*pKey)->type;
|
|
||||||
} else {
|
|
||||||
return TK_ID;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -594,31 +590,28 @@ SSQLToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgn
|
||||||
while (1) {
|
while (1) {
|
||||||
*i += t0.n;
|
*i += t0.n;
|
||||||
|
|
||||||
bool hasComma = false;
|
int32_t numOfComma = 0;
|
||||||
while ((str[*i] == ' ' || str[*i] == '\n' || str[*i] == '\r' || str[*i] == '\t' || str[*i] == '\f')
|
char t = str[*i];
|
||||||
|| str[*i] == ',') {
|
while (t == ' ' || t == '\n' || t == '\r' || t == '\t' || t == '\f' || t == ',') {
|
||||||
if (str[*i] == ',') {
|
if (t == ',' && (++numOfComma > 1)) { // comma only allowed once
|
||||||
if (false == hasComma) {
|
|
||||||
hasComma = true;
|
|
||||||
} else { // comma only allowed once
|
|
||||||
t0.n = 0;
|
t0.n = 0;
|
||||||
return t0;
|
return t0;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
(*i)++;
|
t = str[++(*i)];
|
||||||
}
|
}
|
||||||
|
|
||||||
t0.n = tSQLGetToken(&str[*i], &t0.type);
|
t0.n = tSQLGetToken(&str[*i], &t0.type);
|
||||||
|
|
||||||
bool ignoreFlag = false;
|
bool ignore = false;
|
||||||
for (uint32_t k = 0; k < numOfIgnoreToken; k++) {
|
for (uint32_t k = 0; k < numOfIgnoreToken; k++) {
|
||||||
if (t0.type == ignoreTokenTypes[k]) {
|
if (t0.type == ignoreTokenTypes[k]) {
|
||||||
ignoreFlag = true;
|
ignore = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!ignoreFlag) {
|
if (!ignore) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -662,114 +655,4 @@ SSQLToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgn
|
||||||
return t0;
|
return t0;
|
||||||
}
|
}
|
||||||
|
|
||||||
FORCE_INLINE bool isKeyWord(const char* z, int32_t len) { return (tSQLKeywordCode((char*)z, len) != TK_ID); }
|
bool isKeyWord(const char* z, int32_t len) { return (tSQLKeywordCode((char*)z, len) != TK_ID); }
|
||||||
|
|
||||||
FORCE_INLINE bool isNumber(const SSQLToken* pToken) {
|
|
||||||
return (pToken->type == TK_INTEGER || pToken->type == TK_FLOAT || pToken->type == TK_HEX || pToken->type == TK_BIN);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t isValidNumber(const SSQLToken* pToken) {
|
|
||||||
const char* z = pToken->z;
|
|
||||||
int32_t type = TK_ILLEGAL;
|
|
||||||
|
|
||||||
int32_t i = 0;
|
|
||||||
for(; i < pToken->n; ++i) {
|
|
||||||
switch (z[i]) {
|
|
||||||
case '+':
|
|
||||||
case '-': {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case '.': {
|
|
||||||
/*
|
|
||||||
* handle the the float number with out integer part
|
|
||||||
* .123
|
|
||||||
* .123e4
|
|
||||||
*/
|
|
||||||
if (!isdigit(z[i+1])) {
|
|
||||||
return TK_ILLEGAL;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (i += 2; isdigit(z[i]); i++) {
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((z[i] == 'e' || z[i] == 'E') &&
|
|
||||||
(isdigit(z[i + 1]) || ((z[i + 1] == '+' || z[i + 1] == '-') && isdigit(z[i + 2])))) {
|
|
||||||
i += 2;
|
|
||||||
while (isdigit(z[i])) {
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type = TK_FLOAT;
|
|
||||||
goto _end;
|
|
||||||
}
|
|
||||||
|
|
||||||
case '0': {
|
|
||||||
char next = z[i + 1];
|
|
||||||
if (next == 'b') { // bin number
|
|
||||||
type = TK_BIN;
|
|
||||||
for (i += 2; (z[i] == '0' || z[i] == '1'); ++i) {
|
|
||||||
}
|
|
||||||
|
|
||||||
goto _end;
|
|
||||||
} else if (next == 'x') { //hex number
|
|
||||||
type = TK_HEX;
|
|
||||||
for (i += 2; isdigit(z[i]) || (z[i] >= 'a' && z[i] <= 'f') || (z[i] >= 'A' && z[i] <= 'F'); ++i) {
|
|
||||||
}
|
|
||||||
|
|
||||||
goto _end;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case '1':
|
|
||||||
case '2':
|
|
||||||
case '3':
|
|
||||||
case '4':
|
|
||||||
case '5':
|
|
||||||
case '6':
|
|
||||||
case '7':
|
|
||||||
case '8':
|
|
||||||
case '9': {
|
|
||||||
type = TK_INTEGER;
|
|
||||||
for (; isdigit(z[i]); i++) {
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t seg = 0;
|
|
||||||
while (z[i] == '.' && isdigit(z[i + 1])) {
|
|
||||||
i += 2;
|
|
||||||
|
|
||||||
while (isdigit(z[i])) {
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
|
|
||||||
seg++;
|
|
||||||
type = TK_FLOAT;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (seg > 1) {
|
|
||||||
return TK_ILLEGAL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((z[i] == 'e' || z[i] == 'E') &&
|
|
||||||
(isdigit(z[i + 1]) || ((z[i + 1] == '+' || z[i + 1] == '-') && isdigit(z[i + 2])))) {
|
|
||||||
i += 2;
|
|
||||||
while (isdigit(z[i])) {
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
|
|
||||||
type = TK_FLOAT;
|
|
||||||
}
|
|
||||||
|
|
||||||
goto _end;
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
return TK_ILLEGAL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_end:
|
|
||||||
if (i < pToken->n) {
|
|
||||||
return TK_ILLEGAL;
|
|
||||||
} else {
|
|
||||||
return type;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -16,7 +16,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx)
|
||||||
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
|
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
|
||||||
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
|
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
|
||||||
static int tsdbEstimateTableEncodeSize(STable *pTable);
|
static int tsdbEstimateTableEncodeSize(STable *pTable);
|
||||||
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable);
|
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFromIdx);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encode a TSDB table object as a binary content
|
* Encode a TSDB table object as a binary content
|
||||||
|
@ -414,7 +414,7 @@ int tsdbDropTable(TsdbRepoT *repo, STableId tableId) {
|
||||||
|
|
||||||
tsdbTrace("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, varDataVal(pTable->name),
|
tsdbTrace("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, varDataVal(pTable->name),
|
||||||
tableId.tid, tableId.uid);
|
tableId.tid, tableId.uid);
|
||||||
if (tsdbRemoveTableFromMeta(pMeta, pTable) < 0) return -1;
|
if (tsdbRemoveTableFromMeta(pMeta, pTable, true) < 0) return -1;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
@ -503,7 +503,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) {
|
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFromIdx) {
|
||||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||||
SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex);
|
SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex);
|
||||||
while (tSkipListIterNext(pIter)) {
|
while (tSkipListIterNext(pIter)) {
|
||||||
|
@ -512,7 +512,7 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) {
|
||||||
|
|
||||||
ASSERT(tTable != NULL && tTable->type == TSDB_CHILD_TABLE);
|
ASSERT(tTable != NULL && tTable->type == TSDB_CHILD_TABLE);
|
||||||
|
|
||||||
tsdbRemoveTableFromMeta(pMeta, tTable);
|
tsdbRemoveTableFromMeta(pMeta, tTable, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
tSkipListDestroyIter(pIter);
|
tSkipListDestroyIter(pIter);
|
||||||
|
@ -528,7 +528,7 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pMeta->tables[pTable->tableId.tid] = NULL;
|
pMeta->tables[pTable->tableId.tid] = NULL;
|
||||||
if (pTable->type == TSDB_CHILD_TABLE) {
|
if (pTable->type == TSDB_CHILD_TABLE && rmFromIdx) {
|
||||||
tsdbRemoveTableFromIndex(pMeta, pTable);
|
tsdbRemoveTableFromIndex(pMeta, pTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -395,6 +395,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
|
||||||
|
|
||||||
SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid];
|
SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid];
|
||||||
if (compIndex->len == 0 || compIndex->numOfBlocks == 0) { // no data block in this file, try next file
|
if (compIndex->len == 0 || compIndex->numOfBlocks == 0) { // no data block in this file, try next file
|
||||||
|
pCheckInfo->numOfBlocks = 0;
|
||||||
continue;//no data blocks in the file belongs to pCheckInfo->pTable
|
continue;//no data blocks in the file belongs to pCheckInfo->pTable
|
||||||
} else {
|
} else {
|
||||||
if (pCheckInfo->compSize < compIndex->len) {
|
if (pCheckInfo->compSize < compIndex->len) {
|
||||||
|
@ -544,8 +545,9 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
|
||||||
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
|
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
|
||||||
|
|
||||||
TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL;
|
TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL;
|
||||||
if (pCheckInfo->iter != NULL) {
|
if (pCheckInfo->iter != NULL && tSkipListIterGet(pCheckInfo->iter) != NULL) {
|
||||||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
||||||
|
|
||||||
SDataRow row = SL_GET_NODE_DATA(node);
|
SDataRow row = SL_GET_NODE_DATA(node);
|
||||||
k1 = dataRowKey(row);
|
k1 = dataRowKey(row);
|
||||||
|
|
||||||
|
@ -560,8 +562,9 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCheckInfo->iiter != NULL) {
|
if (pCheckInfo->iiter != NULL && tSkipListIterGet(pCheckInfo->iiter) != NULL) {
|
||||||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
|
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
|
||||||
|
|
||||||
SDataRow row = SL_GET_NODE_DATA(node);
|
SDataRow row = SL_GET_NODE_DATA(node);
|
||||||
k2 = dataRowKey(row);
|
k2 = dataRowKey(row);
|
||||||
|
|
||||||
|
@ -582,6 +585,12 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
|
||||||
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
|
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
|
||||||
} else {
|
} else {
|
||||||
pQueryHandle->realNumOfRows = binfo.rows;
|
pQueryHandle->realNumOfRows = binfo.rows;
|
||||||
|
|
||||||
|
cur->rows = binfo.rows;
|
||||||
|
cur->win = binfo.window;
|
||||||
|
cur->mixBlock = false;
|
||||||
|
cur->blockCompleted = true;
|
||||||
|
cur->lastKey = binfo.window.ekey + (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 1:-1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else { //desc order
|
} else { //desc order
|
||||||
|
@ -858,6 +867,7 @@ static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pos += (end - start + 1) * step;
|
||||||
cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) ||
|
cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) ||
|
||||||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)));
|
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)));
|
||||||
|
|
||||||
|
@ -912,6 +922,9 @@ static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo
|
||||||
|
|
||||||
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
|
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
|
||||||
int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, key, order);
|
int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, key, order);
|
||||||
|
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
|
||||||
|
tSkipListIterNext(pCheckInfo->iter);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t start = -1;
|
int32_t start = -1;
|
||||||
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
|
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
|
||||||
|
|
|
@ -21,6 +21,8 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
#include "ttokendef.h"
|
||||||
|
|
||||||
#define TK_SPACE 200
|
#define TK_SPACE 200
|
||||||
#define TK_COMMENT 201
|
#define TK_COMMENT 201
|
||||||
|
@ -74,14 +76,117 @@ bool isKeyWord(const char *z, int32_t len);
|
||||||
* @param pToken
|
* @param pToken
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
bool isNumber(const SSQLToken *pToken);
|
#define isNumber(tk) \
|
||||||
|
((tk)->type == TK_INTEGER || (tk)->type == TK_FLOAT || (tk)->type == TK_HEX || (tk)->type == TK_BIN)
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* check if it is a token or not
|
* check if it is a token or not
|
||||||
* @param pToken
|
* @param pToken
|
||||||
* @return token type, if it is not a number, TK_ILLEGAL will return
|
* @return token type, if it is not a number, TK_ILLEGAL will return
|
||||||
*/
|
*/
|
||||||
int32_t isValidNumber(const SSQLToken* pToken);
|
static FORCE_INLINE int32_t isValidNumber(const SSQLToken* pToken) {
|
||||||
|
const char* z = pToken->z;
|
||||||
|
int32_t type = TK_ILLEGAL;
|
||||||
|
|
||||||
|
int32_t i = 0;
|
||||||
|
for(; i < pToken->n; ++i) {
|
||||||
|
switch (z[i]) {
|
||||||
|
case '+':
|
||||||
|
case '-': {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case '.': {
|
||||||
|
/*
|
||||||
|
* handle the the float number with out integer part
|
||||||
|
* .123
|
||||||
|
* .123e4
|
||||||
|
*/
|
||||||
|
if (!isdigit(z[i+1])) {
|
||||||
|
return TK_ILLEGAL;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (i += 2; isdigit(z[i]); i++) {
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((z[i] == 'e' || z[i] == 'E') &&
|
||||||
|
(isdigit(z[i + 1]) || ((z[i + 1] == '+' || z[i + 1] == '-') && isdigit(z[i + 2])))) {
|
||||||
|
i += 2;
|
||||||
|
while (isdigit(z[i])) {
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type = TK_FLOAT;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
case '0': {
|
||||||
|
char next = z[i + 1];
|
||||||
|
if (next == 'b') { // bin number
|
||||||
|
type = TK_BIN;
|
||||||
|
for (i += 2; (z[i] == '0' || z[i] == '1'); ++i) {
|
||||||
|
}
|
||||||
|
|
||||||
|
goto _end;
|
||||||
|
} else if (next == 'x') { //hex number
|
||||||
|
type = TK_HEX;
|
||||||
|
for (i += 2; isdigit(z[i]) || (z[i] >= 'a' && z[i] <= 'f') || (z[i] >= 'A' && z[i] <= 'F'); ++i) {
|
||||||
|
}
|
||||||
|
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case '1':
|
||||||
|
case '2':
|
||||||
|
case '3':
|
||||||
|
case '4':
|
||||||
|
case '5':
|
||||||
|
case '6':
|
||||||
|
case '7':
|
||||||
|
case '8':
|
||||||
|
case '9': {
|
||||||
|
type = TK_INTEGER;
|
||||||
|
for (; isdigit(z[i]); i++) {
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t seg = 0;
|
||||||
|
while (z[i] == '.' && isdigit(z[i + 1])) {
|
||||||
|
i += 2;
|
||||||
|
|
||||||
|
while (isdigit(z[i])) {
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
|
||||||
|
seg++;
|
||||||
|
type = TK_FLOAT;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (seg > 1) {
|
||||||
|
return TK_ILLEGAL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((z[i] == 'e' || z[i] == 'E') &&
|
||||||
|
(isdigit(z[i + 1]) || ((z[i + 1] == '+' || z[i + 1] == '-') && isdigit(z[i + 2])))) {
|
||||||
|
i += 2;
|
||||||
|
while (isdigit(z[i])) {
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
|
||||||
|
type = TK_FLOAT;
|
||||||
|
}
|
||||||
|
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return TK_ILLEGAL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
|
return (i < pToken->n)? TK_ILLEGAL:type;
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -102,7 +102,32 @@ static void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode);
|
||||||
* @param hashVal hash value by hash function
|
* @param hashVal hash value by hash function
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
static SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const void *key, uint32_t keyLen, uint32_t *hashVal);
|
FORCE_INLINE SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const void *key, uint32_t keyLen, uint32_t *hashVal) {
|
||||||
|
uint32_t hash = (*pHashObj->hashFp)(key, keyLen);
|
||||||
|
|
||||||
|
int32_t slot = HASH_INDEX(hash, pHashObj->capacity);
|
||||||
|
SHashEntry *pEntry = pHashObj->hashList[slot];
|
||||||
|
|
||||||
|
SHashNode *pNode = pEntry->next;
|
||||||
|
while (pNode) {
|
||||||
|
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
pNode = pNode->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pNode) {
|
||||||
|
assert(HASH_INDEX(pNode->hashVal, pHashObj->capacity) == slot);
|
||||||
|
}
|
||||||
|
|
||||||
|
// return the calculated hash value, to avoid calculating it again in other functions
|
||||||
|
if (hashVal != NULL) {
|
||||||
|
*hashVal = hash;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pNode;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Resize the hash list if the threshold is reached
|
* Resize the hash list if the threshold is reached
|
||||||
|
@ -438,33 +463,6 @@ void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const void *key, uint32_t keyLen, uint32_t *hashVal) {
|
|
||||||
uint32_t hash = (*pHashObj->hashFp)(key, keyLen);
|
|
||||||
|
|
||||||
int32_t slot = HASH_INDEX(hash, pHashObj->capacity);
|
|
||||||
SHashEntry *pEntry = pHashObj->hashList[slot];
|
|
||||||
|
|
||||||
SHashNode *pNode = pEntry->next;
|
|
||||||
while (pNode) {
|
|
||||||
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
pNode = pNode->next;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pNode) {
|
|
||||||
assert(HASH_INDEX(pNode->hashVal, pHashObj->capacity) == slot);
|
|
||||||
}
|
|
||||||
|
|
||||||
// return the calculated hash value, to avoid calculating it again in other functions
|
|
||||||
if (hashVal != NULL) {
|
|
||||||
*hashVal = hash;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
void taosHashTableResize(SHashObj *pHashObj) {
|
void taosHashTableResize(SHashObj *pHashObj) {
|
||||||
if (pHashObj->size < pHashObj->capacity * HASH_DEFAULT_LOAD_FACTOR) {
|
if (pHashObj->size < pHashObj->capacity * HASH_DEFAULT_LOAD_FACTOR) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -10,7 +10,7 @@
|
||||||
#include "hashfunc.h"
|
#include "hashfunc.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
#define ROTL32(x, r) ((x) << (r) | (x) >> (32 - (r)))
|
#define ROTL32(x, r) ((x) << (r) | (x) >> (32u - (r)))
|
||||||
|
|
||||||
#define FMIX32(h) \
|
#define FMIX32(h) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -21,11 +21,11 @@
|
||||||
(h) ^= (h) >> 16; \
|
(h) ^= (h) >> 16; \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
static void MurmurHash3_32_s(const void *key, int len, uint32_t seed, void *out) {
|
uint32_t MurmurHash3_32(const char *key, uint32_t len) {
|
||||||
const uint8_t *data = (const uint8_t *)key;
|
const uint8_t *data = (const uint8_t *)key;
|
||||||
const int nblocks = len / 4;
|
const int nblocks = len >> 2u;
|
||||||
|
|
||||||
uint32_t h1 = seed;
|
uint32_t h1 = 0x12345678;
|
||||||
|
|
||||||
const uint32_t c1 = 0xcc9e2d51;
|
const uint32_t c1 = 0xcc9e2d51;
|
||||||
const uint32_t c2 = 0x1b873593;
|
const uint32_t c2 = 0x1b873593;
|
||||||
|
@ -36,11 +36,11 @@ static void MurmurHash3_32_s(const void *key, int len, uint32_t seed, void *out)
|
||||||
uint32_t k1 = blocks[i];
|
uint32_t k1 = blocks[i];
|
||||||
|
|
||||||
k1 *= c1;
|
k1 *= c1;
|
||||||
k1 = ROTL32(k1, 15);
|
k1 = ROTL32(k1, 15u);
|
||||||
k1 *= c2;
|
k1 *= c2;
|
||||||
|
|
||||||
h1 ^= k1;
|
h1 ^= k1;
|
||||||
h1 = ROTL32(h1, 13);
|
h1 = ROTL32(h1, 13u);
|
||||||
h1 = h1 * 5 + 0xe6546b64;
|
h1 = h1 * 5 + 0xe6546b64;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,7 +48,7 @@ static void MurmurHash3_32_s(const void *key, int len, uint32_t seed, void *out)
|
||||||
|
|
||||||
uint32_t k1 = 0;
|
uint32_t k1 = 0;
|
||||||
|
|
||||||
switch (len & 3) {
|
switch (len & 3u) {
|
||||||
case 3:
|
case 3:
|
||||||
k1 ^= tail[2] << 16;
|
k1 ^= tail[2] << 16;
|
||||||
case 2:
|
case 2:
|
||||||
|
@ -56,7 +56,7 @@ static void MurmurHash3_32_s(const void *key, int len, uint32_t seed, void *out)
|
||||||
case 1:
|
case 1:
|
||||||
k1 ^= tail[0];
|
k1 ^= tail[0];
|
||||||
k1 *= c1;
|
k1 *= c1;
|
||||||
k1 = ROTL32(k1, 15);
|
k1 = ROTL32(k1, 15u);
|
||||||
k1 *= c2;
|
k1 *= c2;
|
||||||
h1 ^= k1;
|
h1 ^= k1;
|
||||||
};
|
};
|
||||||
|
@ -65,16 +65,7 @@ static void MurmurHash3_32_s(const void *key, int len, uint32_t seed, void *out)
|
||||||
|
|
||||||
FMIX32(h1);
|
FMIX32(h1);
|
||||||
|
|
||||||
*(uint32_t *)out = h1;
|
return h1;
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t MurmurHash3_32(const char *key, uint32_t len) {
|
|
||||||
const int32_t hashSeed = 0x12345678;
|
|
||||||
|
|
||||||
uint32_t val = 0;
|
|
||||||
MurmurHash3_32_s(key, len, hashSeed, &val);
|
|
||||||
|
|
||||||
return val;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t taosIntHash_32(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint32_t *)key; }
|
uint32_t taosIntHash_32(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint32_t *)key; }
|
||||||
|
|
|
@ -11,6 +11,19 @@
|
||||||
4. pip install src/connector/python/linux/python2 ; pip3 install
|
4. pip install src/connector/python/linux/python2 ; pip3 install
|
||||||
src/connector/python/linux/python3
|
src/connector/python/linux/python3
|
||||||
|
|
||||||
|
> Note: Both Python2 and Python3 are currently supported by the Python test
|
||||||
|
> framework. Since Python2 is no longer officially supported by Python Software
|
||||||
|
> Foundation since January 1, 2020, it is recommended that subsequent test case
|
||||||
|
> development be guaranteed to run correctly on Python3.
|
||||||
|
|
||||||
|
> For Python2, please consider being compatible if appropriate without
|
||||||
|
> additional burden.
|
||||||
|
>
|
||||||
|
> If you use some new Linux distribution like Ubuntu 20.04 which already do not
|
||||||
|
> include Python2, please do not install Python2-related packages.
|
||||||
|
>
|
||||||
|
> <https://nakedsecurity.sophos.com/2020/01/03/python-is-dead-long-live-python/>
|
||||||
|
|
||||||
### How to run Python test suite
|
### How to run Python test suite
|
||||||
|
|
||||||
1. cd \<TDengine\>/tests/pytest
|
1. cd \<TDengine\>/tests/pytest
|
||||||
|
@ -211,13 +224,6 @@ def checkAffectedRows(self, expectAffectedRows):
|
||||||
|
|
||||||
...
|
...
|
||||||
|
|
||||||
> Note: Both Python2 and Python3 are currently supported by the Python test
|
|
||||||
> case. Since Python2 is no longer officially supported by January 1, 2020, it
|
|
||||||
> is recommended that subsequent test case development be guaranteed to run
|
|
||||||
> correctly on Python3. For Python2, please consider being compatible if
|
|
||||||
> appropriate without additional
|
|
||||||
> burden. <https://nakedsecurity.sophos.com/2020/01/03/python-is-dead-long-live-python/>
|
|
||||||
|
|
||||||
### CI submission adoption principle.
|
### CI submission adoption principle.
|
||||||
|
|
||||||
- Every commit / PR compilation must pass. Currently, the warning is treated
|
- Every commit / PR compilation must pass. Currently, the warning is treated
|
||||||
|
|
|
@ -65,7 +65,7 @@ sleep 2000
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
print ================== server restart completed
|
print ================== server restart completed
|
||||||
|
|
||||||
run general/parser/limit1_tb.sim
|
#run general/parser/limit1_tb.sim
|
||||||
run general/parser/limit1_stb.sim
|
run general/parser/limit1_stb.sim
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -111,16 +111,16 @@ endi
|
||||||
if $data09 != nchar0 then
|
if $data09 != nchar0 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data11 != NULL then
|
if $data11 != null then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data12 != NULL then
|
if $data12 != null then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data13 != NULL then
|
if $data13 != null then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data14 != NULL then
|
if $data14 != null then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
@ -543,7 +543,7 @@ endi
|
||||||
if $data14 != 8.000000000 then
|
if $data14 != 8.000000000 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data21 != NULL then
|
if $data21 != null then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
@ -613,7 +613,7 @@ endi
|
||||||
if $data21 != 7.000000000 then
|
if $data21 != 7.000000000 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data31 != NULL then
|
if $data31 != null then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
sql select avg(c1), avg(c2), avg(c3), avg(c4), avg(c5), avg(c6) from $tb where ts >= $ts0 and ts <= $tsu interval(30m) limit 3 offset 1
|
sql select avg(c1), avg(c2), avg(c3), avg(c4), avg(c5), avg(c6) from $tb where ts >= $ts0 and ts <= $tsu interval(30m) limit 3 offset 1
|
||||||
|
|
|
@ -33,9 +33,9 @@ echo "### run Python script ###"
|
||||||
cd ../pytest
|
cd ../pytest
|
||||||
|
|
||||||
if [ "$1" == "cron" ]; then
|
if [ "$1" == "cron" ]; then
|
||||||
./fulltest.sh > /dev/null | tee pytest-out.txt
|
./fulltest.sh 2>&1 | grep 'successfully executed\|failed\|fault' | grep -v 'default'| tee pytest-out.txt
|
||||||
else
|
else
|
||||||
./smoketest.sh > /dev/null | tee pytest-out.txt
|
./smoketest.sh 2>&1 | grep 'successfully executed\|failed\|fault' | grep -v 'default'| tee pytest-out.txt
|
||||||
fi
|
fi
|
||||||
totalPySuccess=`grep 'successfully executed' pytest-out.txt | wc -l`
|
totalPySuccess=`grep 'successfully executed' pytest-out.txt | wc -l`
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue