Merge remote-tracking branch 'origin/develop' into feature/vnode

This commit is contained in:
Shengliang Guan 2020-07-11 07:45:02 +00:00
commit 164aed70b9
29 changed files with 466 additions and 425 deletions

View File

@ -160,7 +160,9 @@ void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index); int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
void tscFieldInfoClear(SFieldInfo* pFieldInfo); void tscFieldInfoClear(SFieldInfo* pFieldInfo);
int32_t tscNumOfFields(SQueryInfo* pQueryInfo);
static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutput; }
int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2); int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2);
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex); void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex);

View File

@ -412,7 +412,44 @@ char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql); int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column); //void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column);
static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
SFieldSupInfo* pInfo = TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, columnIndex);
assert(pInfo->pSqlExpr != NULL);
int32_t type = pInfo->pSqlExpr->resType;
int32_t bytes = pInfo->pSqlExpr->resBytes;
char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row;
if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
int32_t realLen = varDataLen(pData);
assert(realLen <= bytes - VARSTR_HEADER_SIZE);
if (isNull(pData, type)) {
pRes->tsrow[columnIndex] = NULL;
} else {
pRes->tsrow[columnIndex] = ((tstr*)pData)->data;
}
if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor
*(pData + realLen + VARSTR_HEADER_SIZE) = 0;
}
pRes->length[columnIndex] = realLen;
} else {
assert(bytes == tDataTypeDesc[type].nSize);
if (isNull(pData, type)) {
pRes->tsrow[columnIndex] = NULL;
} else {
pRes->tsrow[columnIndex] = pData;
}
pRes->length[columnIndex] = bytes;
}
}
extern void * tscCacheHandle; extern void * tscCacheHandle;
extern void * tscTmr; extern void * tscTmr;

View File

@ -2952,10 +2952,14 @@ static void tag_project_function(SQLFunctionCtx *pCtx) {
INC_INIT_VAL(pCtx, pCtx->size); INC_INIT_VAL(pCtx, pCtx->size);
assert(pCtx->inputBytes == pCtx->outputBytes); assert(pCtx->inputBytes == pCtx->outputBytes);
for (int32_t i = 0; i < pCtx->size; ++i) { tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->outputType, true);
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->outputType, true); char* data = pCtx->aOutputBuf;
pCtx->aOutputBuf += pCtx->outputBytes;
// directly copy from the first one
for (int32_t i = 1; i < pCtx->size; ++i) {
memmove(pCtx->aOutputBuf, data, pCtx->outputBytes);
pCtx->aOutputBuf += pCtx->outputBytes; pCtx->aOutputBuf += pCtx->outputBytes;
} }
} }
@ -3941,7 +3945,7 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) {
tsBufFlush(pTSbuf); tsBufFlush(pTSbuf);
strcpy(pCtx->aOutputBuf, pTSbuf->path); strcpy(pCtx->aOutputBuf, pTSbuf->path);
tsBufDestory(pTSbuf); tsBufDestroy(pTSbuf);
doFinalizer(pCtx); doFinalizer(pCtx);
} }

View File

@ -175,7 +175,6 @@ static int32_t handlePassword(SSqlCmd* pCmd, SSQLToken* pPwd) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// todo handle memory leak in error handle function
int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (pInfo == NULL || pSql == NULL || pSql->signature != pSql) { if (pInfo == NULL || pSql == NULL || pSql->signature != pSql) {
return TSDB_CODE_TSC_APP_ERROR; return TSDB_CODE_TSC_APP_ERROR;

View File

@ -403,7 +403,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj); taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
sem_wait(&pSql->rspSem); sem_wait(&pSql->rspSem);
} }
return doSetResultRowData(pSql, true); return doSetResultRowData(pSql, true);
} }

View File

@ -152,8 +152,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
tsBufFlush(output1); tsBufFlush(output1);
tsBufFlush(output2); tsBufFlush(output2);
tsBufDestory(pSupporter1->pTSBuf); tsBufDestroy(pSupporter1->pTSBuf);
tsBufDestory(pSupporter2->pTSBuf); tsBufDestroy(pSupporter2->pTSBuf);
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks " tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks "
"intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql, numOfInput1, numOfInput2, output1->numOfTotal, "intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql, numOfInput1, numOfInput2, output1->numOfTotal,
@ -762,7 +762,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex); tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex);
tsBufDestory(pBuf); tsBufDestroy(pBuf);
} }
// continue to retrieve ts-comp data from vnode // continue to retrieve ts-comp data from vnode
@ -2106,9 +2106,9 @@ static char *getArithemicInputSrc(void *param, const char *name, int32_t colId)
void **doSetResultRowData(SSqlObj *pSql, bool finalResult) { void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows); assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
if(pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { if(pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
if (pRes->completed) { if (pRes->completed) {
tfree(pRes->tsrow); tfree(pRes->tsrow);
@ -2116,29 +2116,31 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
return pRes->tsrow; return pRes->tsrow;
} }
if (pRes->row >= pRes->numOfRows) { // all the results has returned to invoker if (pRes->row >= pRes->numOfRows) { // all the results has returned to invoker
tfree(pRes->tsrow); tfree(pRes->tsrow);
return pRes->tsrow; return pRes->tsrow;
} }
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
size_t size = tscNumOfFields(pQueryInfo); size_t size = tscNumOfFields(pQueryInfo);
for (int i = 0; i < size; ++i) { for (int i = 0; i < size; ++i) {
SFieldSupInfo* pSup = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, i); SFieldSupInfo* pSup = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.pSupportInfo, i);
if (pSup->pSqlExpr != NULL) { if (pSup->pSqlExpr != NULL) {
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i); tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
} }
// primary key column cannot be null in interval query, no need to check // primary key column cannot be null in interval query, no need to check
if (i == 0 && pQueryInfo->intervalTime > 0) { if (i == 0 && pQueryInfo->intervalTime > 0) {
continue; continue;
} }
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); TAOS_FIELD *pField = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.pFields, i);
transferNcharData(pSql, i, pField); if (pRes->tsrow[i] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
transferNcharData(pSql, i, pField);
}
// calculate the result from several other columns // calculate the result from several other columns
if (pSup->pArithExprInfo != NULL) { if (pSup->pArithExprInfo != NULL) {
if (pRes->pArithSup == NULL) { if (pRes->pArithSup == NULL) {
@ -2148,10 +2150,10 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
sas->numOfCols = tscSqlExprNumOfExprs(pQueryInfo); sas->numOfCols = tscSqlExprNumOfExprs(pQueryInfo);
sas->exprList = pQueryInfo->exprList; sas->exprList = pQueryInfo->exprList;
sas->data = calloc(sas->numOfCols, POINTER_BYTES); sas->data = calloc(sas->numOfCols, POINTER_BYTES);
pRes->pArithSup = sas; pRes->pArithSup = sas;
} }
if (pRes->buffer[i] == NULL) { if (pRes->buffer[i] == NULL) {
TAOS_FIELD* field = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); TAOS_FIELD* field = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pRes->buffer[i] = malloc(field->bytes); pRes->buffer[i] = malloc(field->bytes);
@ -2161,13 +2163,13 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k); SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
pRes->pArithSup->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes; pRes->pArithSup->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
} }
tExprTreeCalcTraverse(pRes->pArithSup->pArithExpr->pExpr, 1, pRes->buffer[i], pRes->pArithSup, tExprTreeCalcTraverse(pRes->pArithSup->pArithExpr->pExpr, 1, pRes->buffer[i], pRes->pArithSup,
TSDB_ORDER_ASC, getArithemicInputSrc); TSDB_ORDER_ASC, getArithemicInputSrc);
pRes->tsrow[i] = pRes->buffer[i]; pRes->tsrow[i] = pRes->buffer[i];
} }
} }
pRes->row++; // index increase one-step pRes->row++; // index increase one-step
return pRes->tsrow; return pRes->tsrow;
} }

View File

@ -794,7 +794,7 @@ SFieldSupInfo* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField) {
} }
SFieldSupInfo* tscFieldInfoGetSupp(SFieldInfo* pFieldInfo, int32_t index) { SFieldSupInfo* tscFieldInfoGetSupp(SFieldInfo* pFieldInfo, int32_t index) {
return taosArrayGet(pFieldInfo->pSupportInfo, index); return TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, index);
} }
SFieldSupInfo* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* field) { SFieldSupInfo* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* field) {
@ -858,11 +858,9 @@ void tscFieldInfoCopy(SFieldInfo* dst, const SFieldInfo* src) {
} }
TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index) { TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index) {
return taosArrayGet(pFieldInfo->pFields, index); return TARRAY_GET_ELEM(pFieldInfo->pFields, index);
} }
int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutput; }
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) { int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) {
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, index); SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, index);
assert(pInfo != NULL); assert(pInfo != NULL);
@ -1546,7 +1544,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
pQueryInfo->groupbyExpr.columnInfo = NULL; pQueryInfo->groupbyExpr.columnInfo = NULL;
} }
pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf); pQueryInfo->tsBuf = tsBufDestroy(pQueryInfo->tsBuf);
tfree(pQueryInfo->fillVal); tfree(pQueryInfo->fillVal);
} }
@ -2085,42 +2083,42 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
} }
} }
void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) { //void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
SFieldSupInfo* pInfo = taosArrayGet(pFieldInfo->pSupportInfo, columnIndex); // SFieldSupInfo* pInfo = TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, columnIndex);
assert(pInfo->pSqlExpr != NULL); // assert(pInfo->pSqlExpr != NULL);
//
int32_t type = pInfo->pSqlExpr->resType; // int32_t type = pInfo->pSqlExpr->resType;
int32_t bytes = pInfo->pSqlExpr->resBytes; // int32_t bytes = pInfo->pSqlExpr->resBytes;
//
char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row; // char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row;
//
if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) { // if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
int32_t realLen = varDataLen(pData); // int32_t realLen = varDataLen(pData);
assert(realLen <= bytes - VARSTR_HEADER_SIZE); // assert(realLen <= bytes - VARSTR_HEADER_SIZE);
//
if (isNull(pData, type)) { // if (isNull(pData, type)) {
pRes->tsrow[columnIndex] = NULL; // pRes->tsrow[columnIndex] = NULL;
} else { // } else {
pRes->tsrow[columnIndex] = ((tstr*)pData)->data; // pRes->tsrow[columnIndex] = ((tstr*)pData)->data;
} // }
//
if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor // if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor
*(pData + realLen + VARSTR_HEADER_SIZE) = 0; // *(pData + realLen + VARSTR_HEADER_SIZE) = 0;
} // }
//
pRes->length[columnIndex] = realLen; // pRes->length[columnIndex] = realLen;
} else { // } else {
assert(bytes == tDataTypeDesc[type].nSize); // assert(bytes == tDataTypeDesc[type].nSize);
//
if (isNull(pData, type)) { // if (isNull(pData, type)) {
pRes->tsrow[columnIndex] = NULL; // pRes->tsrow[columnIndex] = NULL;
} else { // } else {
pRes->tsrow[columnIndex] = pData; // pRes->tsrow[columnIndex] = pData;
} // }
//
pRes->length[columnIndex] = bytes; // pRes->length[columnIndex] = bytes;
} // }
} //}
void* malloc_throw(size_t size) { void* malloc_throw(size_t size) {
void* p = malloc(size); void* p = malloc(size);

View File

@ -555,7 +555,7 @@ static void doInitGlobalConfig() {
cfg.ptr = &tsMinIntervalTime; cfg.ptr = &tsMinIntervalTime;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 10; cfg.minValue = 1;
cfg.maxValue = 1000000; cfg.maxValue = 1000000;
cfg.ptrLength = 0; cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_MS; cfg.unitType = TAOS_CFG_UTYPE_MS;

View File

@ -82,7 +82,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
} }
int64_t start = ((startTime - intervalTime) / slidingTime + 1) * slidingTime; int64_t start = ((startTime - intervalTime) / slidingTime + 1) * slidingTime;
if (!(timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h')) { if (!(timeUnit == 'u' || timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h')) {
/* /*
* here we revised the start time of day according to the local time zone, * here we revised the start time of day according to the local time zone,
* but in case of DST, the start time of one day need to be dynamically decided. * but in case of DST, the start time of one day need to be dynamically decided.

View File

@ -367,31 +367,31 @@ bool isValidDataType(int32_t type) {
return type >= TSDB_DATA_TYPE_NULL && type <= TSDB_DATA_TYPE_NCHAR; return type >= TSDB_DATA_TYPE_NULL && type <= TSDB_DATA_TYPE_NCHAR;
} }
bool isNull(const char *val, int32_t type) { //bool isNull(const char *val, int32_t type) {
switch (type) { // switch (type) {
case TSDB_DATA_TYPE_BOOL: // case TSDB_DATA_TYPE_BOOL:
return *(uint8_t *)val == TSDB_DATA_BOOL_NULL; // return *(uint8_t *)val == TSDB_DATA_BOOL_NULL;
case TSDB_DATA_TYPE_TINYINT: // case TSDB_DATA_TYPE_TINYINT:
return *(uint8_t *)val == TSDB_DATA_TINYINT_NULL; // return *(uint8_t *)val == TSDB_DATA_TINYINT_NULL;
case TSDB_DATA_TYPE_SMALLINT: // case TSDB_DATA_TYPE_SMALLINT:
return *(uint16_t *)val == TSDB_DATA_SMALLINT_NULL; // return *(uint16_t *)val == TSDB_DATA_SMALLINT_NULL;
case TSDB_DATA_TYPE_INT: // case TSDB_DATA_TYPE_INT:
return *(uint32_t *)val == TSDB_DATA_INT_NULL; // return *(uint32_t *)val == TSDB_DATA_INT_NULL;
case TSDB_DATA_TYPE_BIGINT: // case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP: // case TSDB_DATA_TYPE_TIMESTAMP:
return *(uint64_t *)val == TSDB_DATA_BIGINT_NULL; // return *(uint64_t *)val == TSDB_DATA_BIGINT_NULL;
case TSDB_DATA_TYPE_FLOAT: // case TSDB_DATA_TYPE_FLOAT:
return *(uint32_t *)val == TSDB_DATA_FLOAT_NULL; // return *(uint32_t *)val == TSDB_DATA_FLOAT_NULL;
case TSDB_DATA_TYPE_DOUBLE: // case TSDB_DATA_TYPE_DOUBLE:
return *(uint64_t *)val == TSDB_DATA_DOUBLE_NULL; // return *(uint64_t *)val == TSDB_DATA_DOUBLE_NULL;
case TSDB_DATA_TYPE_NCHAR: // case TSDB_DATA_TYPE_NCHAR:
return *(uint32_t*) varDataVal(val) == TSDB_DATA_NCHAR_NULL; // return *(uint32_t*) varDataVal(val) == TSDB_DATA_NCHAR_NULL;
case TSDB_DATA_TYPE_BINARY: // case TSDB_DATA_TYPE_BINARY:
return *(uint8_t *) varDataVal(val) == TSDB_DATA_BINARY_NULL; // return *(uint8_t *) varDataVal(val) == TSDB_DATA_BINARY_NULL;
default: // default:
return false; // return false;
}; // };
} //}
void setVardataNull(char* val, int32_t type) { void setVardataNull(char* val, int32_t type) {
if (type == TSDB_DATA_TYPE_BINARY) { if (type == TSDB_DATA_TYPE_BINARY) {

View File

@ -52,7 +52,7 @@ public class TDNode {
public void start() { public void start() {
String selfPath = System.getProperty("user.dir"); String selfPath = System.getProperty("user.dir");
String binPath = ""; String binPath = "";
String projDir = selfPath + "/../../../../"; String projDir = selfPath + "/../../../";
try { try {
ArrayList<String> taosdPath = new ArrayList<>(); ArrayList<String> taosdPath = new ArrayList<>();
@ -68,7 +68,7 @@ public class TDNode {
return; return;
} else { } else {
for(String p : taosdPath) { for(String p : taosdPath) {
if(!p.contains("packing")) { if(!p.contains("packaging")) {
binPath = p; binPath = p;
} }
} }

View File

@ -20,7 +20,6 @@ extern "C" {
#endif #endif
typedef void* qinfo_t; typedef void* qinfo_t;
typedef void (*_qinfo_free_fn_t)(void*);
/** /**
* create the qinfo object according to QueryTableMsg * create the qinfo object according to QueryTableMsg
@ -29,13 +28,8 @@ typedef void (*_qinfo_free_fn_t)(void*);
* @param qinfo * @param qinfo
* @return * @return
*/ */
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, void* param, _qinfo_free_fn_t fn, qinfo_t* qinfo); int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, void* param, qinfo_t* qinfo);
/**
* Destroy QInfo object
* @param qinfo qhandle
*/
void qDestroyQueryInfo(qinfo_t qinfo);
/** /**
* the main query execution function, including query on both table and multitables, * the main query execution function, including query on both table and multitables,
@ -84,8 +78,14 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo);
*/ */
int32_t qKillQuery(qinfo_t qinfo); int32_t qKillQuery(qinfo_t qinfo);
/**
* destroy query info structure
* @param qHandle
*/
void qDestroyQueryInfo(qinfo_t qHandle);
void* qOpenQueryMgmt(int32_t vgId); void* qOpenQueryMgmt(int32_t vgId);
void qSetQueryMgmtClosed(void* pExecutor); void qQueryMgmtNotifyClosed(void* pExecutor);
void qCleanupQueryMgmt(void* pExecutor); void qCleanupQueryMgmt(void* pExecutor);
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo); void** qRegisterQInfo(void* pMgmt, uint64_t qInfo);
void** qAcquireQInfo(void* pMgmt, uint64_t key); void** qAcquireQInfo(void* pMgmt, uint64_t key);

View File

@ -160,7 +160,32 @@ extern tDataTypeDescriptor tDataTypeDesc[11];
#define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*) #define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*)
bool isValidDataType(int32_t type); bool isValidDataType(int32_t type);
bool isNull(const char *val, int32_t type); //bool isNull(const char *val, int32_t type);
static inline __attribute__((always_inline)) bool isNull(const char *val, int32_t type) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
return *(uint8_t *)val == TSDB_DATA_BOOL_NULL;
case TSDB_DATA_TYPE_TINYINT:
return *(uint8_t *)val == TSDB_DATA_TINYINT_NULL;
case TSDB_DATA_TYPE_SMALLINT:
return *(uint16_t *)val == TSDB_DATA_SMALLINT_NULL;
case TSDB_DATA_TYPE_INT:
return *(uint32_t *)val == TSDB_DATA_INT_NULL;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
return *(uint64_t *)val == TSDB_DATA_BIGINT_NULL;
case TSDB_DATA_TYPE_FLOAT:
return *(uint32_t *)val == TSDB_DATA_FLOAT_NULL;
case TSDB_DATA_TYPE_DOUBLE:
return *(uint64_t *)val == TSDB_DATA_DOUBLE_NULL;
case TSDB_DATA_TYPE_NCHAR:
return *(uint32_t*) varDataVal(val) == TSDB_DATA_NCHAR_NULL;
case TSDB_DATA_TYPE_BINARY:
return *(uint8_t *) varDataVal(val) == TSDB_DATA_BINARY_NULL;
default:
return false;
};
}
void setVardataNull(char* val, int32_t type); void setVardataNull(char* val, int32_t type);
void setNull(char *val, int32_t type, int32_t bytes); void setNull(char *val, int32_t type, int32_t bytes);

View File

@ -192,7 +192,6 @@ typedef struct SQInfo {
int32_t offset; // offset in group result set of subgroup, todo refactor int32_t offset; // offset in group result set of subgroup, todo refactor
SArray* arrTableIdInfo; SArray* arrTableIdInfo;
T_REF_DECLARE()
/* /*
* the query is executed position on which meter of the whole list. * the query is executed position on which meter of the whole list.
* when the index reaches the last one of the list, it means the query is completed. * when the index reaches the last one of the list, it means the query is completed.
@ -201,8 +200,6 @@ typedef struct SQInfo {
*/ */
int32_t tableIndex; int32_t tableIndex;
int32_t numOfGroupResultPages; int32_t numOfGroupResultPages;
_qinfo_free_fn_t freeFn; //todo remove it
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
} SQInfo; } SQInfo;

View File

@ -107,7 +107,7 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order);
STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete); STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete);
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder); STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder);
void* tsBufDestory(STSBuf* pTSBuf); void* tsBufDestroy(STSBuf* pTSBuf);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len); void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx); int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx);

View File

@ -187,7 +187,7 @@ typedef struct SQLFunctionCtx {
} SQLFunctionCtx; } SQLFunctionCtx;
typedef struct SQLAggFuncElem { typedef struct SQLAggFuncElem {
char aName[TSDB_FUNCTIONS_NAME_MAX_LENGTH]; char aName[TSDB_FUNCTIONS_NAME_MAX_LENGTH];
uint8_t nAggIdx; // index of function in aAggs uint8_t nAggIdx; // index of function in aAggs
int8_t stableFuncId; // transfer function for super table query int8_t stableFuncId; // transfer function for super table query

View File

@ -44,7 +44,7 @@
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN) #define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define GET_QINFO_ADDR(x) ((void *)((char *)(x)-offsetof(SQInfo, runtimeEnv))) #define GET_QINFO_ADDR(x) ((SQInfo *)((char *)(x)-offsetof(SQInfo, runtimeEnv)))
#define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step)) #define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step))
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
@ -120,6 +120,7 @@ static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) #define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
static void setQueryStatus(SQuery *pQuery, int8_t status); static void setQueryStatus(SQuery *pQuery, int8_t status);
static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv);
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0) #define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0)
@ -351,27 +352,6 @@ static bool hasTagValOutput(SQuery* pQuery) {
return false; return false;
} }
static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, int32_t numOfCols, int32_t index) {
// for a tag column, no corresponding field info
SColIndex *pColIndex = &pQuery->pSelectExpr[index].base.colInfo;
if (TSDB_COL_IS_TAG(pColIndex->flag)) {
return NULL;
}
/*
* Choose the right column field info by field id, since the file block may be out of date,
* which means the newest table schema is not equalled to the schema of this block.
* TODO: speedup by using bsearch
*/
for (int32_t i = 0; i < numOfCols; ++i) {
if (pColIndex->colId == pStatis[i].colId) {
return &pStatis[i];
}
}
return NULL;
}
/** /**
* @param pQuery * @param pQuery
* @param col * @param col
@ -380,19 +360,14 @@ static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, int32_t
* @param pColStatis * @param pColStatis
* @return * @return
*/ */
static bool hasNullValue(SQuery *pQuery, int32_t col, int32_t numOfCols, SDataStatis *pStatis, SDataStatis **pColStatis) { static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis **pColStatis) {
SColIndex *pColIndex = &pQuery->pSelectExpr[col].base.colInfo; if (TSDB_COL_IS_TAG(pColIndex->flag) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
if (TSDB_COL_IS_TAG(pColIndex->flag)) {
return false;
}
// query on primary timestamp column, not null value at all
if (pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return false; return false;
} }
if (pStatis != NULL) { if (pStatis != NULL) {
*pColStatis = getStatisInfo(pQuery, pStatis, numOfCols, col); *pColStatis = &pStatis[pColIndex->colIndex];
assert((*pColStatis)->colId == pColIndex->colId);
} else { } else {
*pColStatis = NULL; *pColStatis = NULL;
} }
@ -842,8 +817,8 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
return NULL; return NULL;
} }
char *dataBlock = NULL;
char *dataBlock = NULL;
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
@ -864,6 +839,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
sas->data = calloc(pQuery->numOfCols, POINTER_BYTES); sas->data = calloc(pQuery->numOfCols, POINTER_BYTES);
if (sas->data == NULL) { if (sas->data == NULL) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
@ -887,10 +863,14 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
} else { // other type of query function } else { // other type of query function
SColIndex *pCol = &pQuery->pSelectExpr[col].base.colInfo; SColIndex *pCol = &pQuery->pSelectExpr[col].base.colInfo;
if (TSDB_COL_IS_TAG(pCol->flag) || pDataBlock == NULL) { if (TSDB_COL_IS_TAG(pCol->flag)) {
dataBlock = NULL; dataBlock = NULL;
} else { } else {
dataBlock = getDataBlockImpl(pDataBlock, pCol->colId); SColIndex* pColIndex = &pQuery->pSelectExpr[col].base.colInfo;
SColumnInfoData *p = taosArrayGet(pDataBlock, pColIndex->colIndex);
assert(p->info.colId == pColIndex->colId);
dataBlock = p->pData;
} }
} }
@ -922,6 +902,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
if (sasArray == NULL) { if (sasArray == NULL) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
@ -1168,6 +1149,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
if (sasArray == NULL) { if (sasArray == NULL) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
@ -1365,7 +1347,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
int32_t colId = pQuery->pSelectExpr[colIndex].base.colInfo.colId; int32_t colId = pQuery->pSelectExpr[colIndex].base.colInfo.colId;
SDataStatis *tpField = NULL; SDataStatis *tpField = NULL;
pCtx->hasNull = hasNullValue(pQuery, colIndex, pBlockInfo->numOfCols, pStatis, &tpField); pCtx->hasNull = hasNullValue(&pQuery->pSelectExpr[colIndex].base.colInfo, pStatis, &tpField);
pCtx->aInputElemBuf = inputData; pCtx->aInputElemBuf = inputData;
if (tpField != NULL) { if (tpField != NULL) {
@ -1619,22 +1601,21 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); pRuntimeEnv->pTSBuf = tsBufDestroy(pRuntimeEnv->pTSBuf);
} }
static bool isQueryKilled(SQInfo *pQInfo) { #define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
return (pQInfo->code == TSDB_CODE_TSC_QUERY_CANCELLED);
}
static void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; } static void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
static bool isFixedOutputQuery(SQuery *pQuery) { static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) {
if (pQuery->intervalTime != 0) { SQuery* pQuery = pRuntimeEnv->pQuery;
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
return false; return false;
} }
// Note:top/bottom query is fixed output query // Note:top/bottom query is fixed output query
if (isTopBottomQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if (pRuntimeEnv->topBotQuery || pRuntimeEnv->groupbyNormalCol) {
return true; return true;
} }
@ -2054,7 +2035,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
// check if this data block is required to load // check if this data block is required to load
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base; SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base;
int32_t functionId = pSqlFunc->functionId; int32_t functionId = pSqlFunc->functionId;
int32_t colId = pSqlFunc->colInfo.colId; int32_t colId = pSqlFunc->colInfo.colId;
r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pQuery->window.skey, pQuery->window.ekey, colId); r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pQuery->window.skey, pQuery->window.ekey, colId);
@ -2066,8 +2047,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
} }
if (r == BLK_DATA_NO_NEEDED) { if (r == BLK_DATA_NO_NEEDED) {
qDebug("QInfo:%p data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_QINFO_ADDR(pRuntimeEnv), qDebug("QInfo:%p data block discard, rows:%d", GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->rows);
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pRuntimeEnv->summary.discardBlocks += 1; pRuntimeEnv->summary.discardBlocks += 1;
} else if (r == BLK_DATA_STATIS_NEEDED) { } else if (r == BLK_DATA_STATIS_NEEDED) {
if (tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) { if (tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) {
@ -2199,7 +2179,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa
static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) { static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) {
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pRuntimeEnv->groupbyNormalCol && !isFixedOutputQuery(pQuery)) { if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pRuntimeEnv->groupbyNormalCol && !isFixedOutputQuery(pRuntimeEnv)) {
SResultRec *pRec = &pQuery->rec; SResultRec *pRec = &pQuery->rec;
if (pQuery->rec.capacity - pQuery->rec.rows < pBlockInfo->rows) { if (pQuery->rec.capacity - pQuery->rec.rows < pBlockInfo->rows) {
@ -2249,8 +2229,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
while (tsdbNextDataBlock(pQueryHandle)) { while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1; summary->totalBlocks += 1;
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
return 0; if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
@ -3304,8 +3286,9 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
cond.twindow.skey, cond.twindow.ekey); cond.twindow.skey, cond.twindow.ekey);
// check if query is killed or not // check if query is killed or not
if (isQueryKilled(pQInfo)) { if (IS_QUERY_KILLED(pQInfo)) {
return; finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
} }
@ -3695,23 +3678,24 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo) {
assert(pQuery->rec.rows <= pQuery->rec.capacity); assert(pQuery->rec.rows <= pQuery->rec.capacity);
} }
static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) { static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
// update the number of result for each, only update the number of rows for the corresponding window result. // update the number of result for each, only update the number of rows for the corresponding window result.
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) { if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
return;
}
for (int32_t i = 0; i < pRuntimeEnv->windowResInfo.size; ++i) { for (int32_t i = 0; i < pRuntimeEnv->windowResInfo.size; ++i) {
SWindowResult *pResult = &pRuntimeEnv->windowResInfo.pResult[i]; SWindowResult *pResult = &pRuntimeEnv->windowResInfo.pResult[i];
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pRuntimeEnv->pCtx[j].functionId; int32_t functionId = pRuntimeEnv->pCtx[j].functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) { if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) {
continue; continue;
}
pResult->numOfRows = MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes);
} }
pResult->numOfRows = MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes);
} }
} }
} }
@ -3729,8 +3713,6 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *
} else { } else {
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
} }
updateWindowResNumOfRes(pRuntimeEnv, pTableQueryInfo);
} }
bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) { bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
@ -3950,8 +3932,9 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
while (tsdbNextDataBlock(pQueryHandle)) { while (tsdbNextDataBlock(pQueryHandle)) {
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
return; finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
@ -4099,7 +4082,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
return; return;
} }
if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pQuery))) { if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pRuntimeEnv))) {
return; return;
} }
@ -4115,7 +4098,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
&& (cond.order == TSDB_ORDER_ASC) && (cond.order == TSDB_ORDER_ASC)
&& (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!QUERY_IS_INTERVAL_QUERY(pQuery))
&& (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) && (!isGroupbyNormalCol(pQuery->pGroupbyExpr))
&& (!isFixedOutputQuery(pQuery)) && (!isFixedOutputQuery(pRuntimeEnv))
) { ) {
SArray* pa = GET_TABLEGROUP(pQInfo, 0); SArray* pa = GET_TABLEGROUP(pQInfo, 0);
STableQueryInfo* pCheckInfo = taosArrayGetP(pa, 0); STableQueryInfo* pCheckInfo = taosArrayGetP(pa, 0);
@ -4158,7 +4141,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
pQuery->precision = tsdbGetCfg(tsdb)->precision; pQuery->precision = tsdbGetCfg(tsdb)->precision;
pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery);
pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery);
setScanLimitationByResultBuffer(pQuery); setScanLimitationByResultBuffer(pQuery);
changeExecuteScanOrder(pQInfo, false); changeExecuteScanOrder(pQInfo, false);
@ -4236,10 +4222,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
pQuery->fillType, pColInfo); pQuery->fillType, pColInfo);
} }
// todo refactor
pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery);
pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery);
setQueryStatus(pQuery, QUERY_NOT_COMPLETED); setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -4267,8 +4249,9 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
while (tsdbNextDataBlock(pQueryHandle)) { while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1; summary->totalBlocks += 1;
if (isQueryKilled(pQInfo)) { if (IS_QUERY_KILLED(pQInfo)) {
break; finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
@ -4304,6 +4287,8 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
pQInfo, blockInfo.uid, blockInfo.tid, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, pQuery->current->lastKey); pQInfo, blockInfo.uid, blockInfo.tid, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, pQuery->current->lastKey);
} }
updateWindowResNumOfRes(pRuntimeEnv);
int64_t et = taosGetTimestampMs(); int64_t et = taosGetTimestampMs();
return et - st; return et - st;
} }
@ -4316,7 +4301,9 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
SArray *group = GET_TABLEGROUP(pQInfo, 0); SArray *group = GET_TABLEGROUP(pQInfo, 0);
STableQueryInfo* pCheckInfo = taosArrayGetP(group, index); STableQueryInfo* pCheckInfo = taosArrayGetP(group, index);
setTagVal(pRuntimeEnv, pCheckInfo->pTable, pQInfo->tsdb); if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTSBuf != NULL) {
setTagVal(pRuntimeEnv, pCheckInfo->pTable, pQInfo->tsdb);
}
STableId* id = TSDB_TABLEID(pCheckInfo->pTable); STableId* id = TSDB_TABLEID(pCheckInfo->pTable);
qDebug("QInfo:%p query on (%d): uid:%" PRIu64 ", tid:%d, qrange:%" PRId64 "-%" PRId64, pQInfo, index, qDebug("QInfo:%p query on (%d): uid:%" PRIu64 ", tid:%d, qrange:%" PRId64 "-%" PRId64, pQInfo, index,
@ -4549,8 +4536,9 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList)); 1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList));
while (pQInfo->tableIndex < pQInfo->tableqinfoGroupInfo.numOfTables) { while (pQInfo->tableIndex < pQInfo->tableqinfoGroupInfo.numOfTables) {
if (isQueryKilled(pQInfo)) { if (IS_QUERY_KILLED(pQInfo)) {
return; finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
pQuery->current = taosArrayGetP(group, pQInfo->tableIndex); pQuery->current = taosArrayGetP(group, pQInfo->tableIndex);
@ -4744,9 +4732,10 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
qDebug("QInfo:%p master scan completed, elapsed time: %" PRId64 "ms, reverse scan start", pQInfo, el); qDebug("QInfo:%p master scan completed, elapsed time: %" PRId64 "ms, reverse scan start", pQInfo, el);
// query error occurred or query is killed, abort current execution // query error occurred or query is killed, abort current execution
if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) { if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
return; finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
// close all time window results // close all time window results
@ -4766,9 +4755,10 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) { if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
return; finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
if (QUERY_IS_INTERVAL_QUERY(pQuery) || isSumAvgRateQuery(pQuery)) { if (QUERY_IS_INTERVAL_QUERY(pQuery) || isSumAvgRateQuery(pQuery)) {
@ -4806,8 +4796,9 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
scanOneTableDataBlocks(pRuntimeEnv, pTableInfo->lastKey); scanOneTableDataBlocks(pRuntimeEnv, pTableInfo->lastKey);
finalizeQueryResult(pRuntimeEnv); finalizeQueryResult(pRuntimeEnv);
if (isQueryKilled(pQInfo)) { if (IS_QUERY_KILLED(pQInfo)) {
return; finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
} }
// since the numOfRows must be identical for all sql functions that are allowed to be executed simutaneously. // since the numOfRows must be identical for all sql functions that are allowed to be executed simutaneously.
@ -4839,10 +4830,6 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey); scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
finalizeQueryResult(pRuntimeEnv); finalizeQueryResult(pRuntimeEnv);
if (isQueryKilled(pQInfo)) {
return;
}
pQuery->rec.rows = getNumOfResult(pRuntimeEnv); pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->rec.rows > 0) { if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->rec.rows > 0) {
skipResults(pRuntimeEnv); skipResults(pRuntimeEnv);
@ -4887,10 +4874,6 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start)
while (1) { while (1) {
scanOneTableDataBlocks(pRuntimeEnv, start); scanOneTableDataBlocks(pRuntimeEnv, start);
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
return;
}
assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_NOT_COMPLETED)); assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_NOT_COMPLETED));
finalizeQueryResult(pRuntimeEnv); finalizeQueryResult(pRuntimeEnv);
@ -5024,7 +5007,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
// group by normal column, sliding window query, interval query are handled by interval query processor // group by normal column, sliding window query, interval query are handled by interval query processor
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { // interval (down sampling operation) if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { // interval (down sampling operation)
tableIntervalProcess(pQInfo, item); tableIntervalProcess(pQInfo, item);
} else if (isFixedOutputQuery(pQuery)) { } else if (isFixedOutputQuery(pRuntimeEnv)) {
tableFixedOutputProcess(pQInfo, item); tableFixedOutputProcess(pQInfo, item);
} else { // diff/add/multiply/subtract/division } else { // diff/add/multiply/subtract/division
assert(pQuery->checkBuffer == 1); assert(pQuery->checkBuffer == 1);
@ -5044,7 +5027,7 @@ static void stableQueryImpl(SQInfo *pQInfo) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
if (QUERY_IS_INTERVAL_QUERY(pQuery) || if (QUERY_IS_INTERVAL_QUERY(pQuery) ||
(isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !pRuntimeEnv->groupbyNormalCol && (isFixedOutputQuery(pRuntimeEnv) && (!isPointInterpoQuery(pQuery)) && !pRuntimeEnv->groupbyNormalCol &&
!isFirstLastRowQuery(pQuery))) { !isFirstLastRowQuery(pQuery))) {
multiTableQueryProcess(pQInfo); multiTableQueryProcess(pQInfo);
} else { } else {
@ -5811,7 +5794,7 @@ static bool isValidQInfo(void *param) {
return (sig == (uint64_t)pQInfo); return (sig == (uint64_t)pQInfo);
} }
static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable, void* param, _qinfo_free_fn_t fn) { static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable, void* param) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
@ -5836,7 +5819,6 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
} }
pQInfo->param = param; pQInfo->param = param;
pQInfo->freeFn = fn;
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) { if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo); qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo);
@ -6038,8 +6020,7 @@ typedef struct SQueryMgmt {
pthread_mutex_t lock; pthread_mutex_t lock;
} SQueryMgmt; } SQueryMgmt;
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, void* param, _qinfo_free_fn_t fn, int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, void* param, qinfo_t* pQInfo) {
qinfo_t* pQInfo) {
assert(pQueryMsg != NULL && tsdb != NULL); assert(pQueryMsg != NULL && tsdb != NULL);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -6129,7 +6110,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo
goto _over; goto _over;
} }
code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery, param, fn); code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery, param);
_over: _over:
free(tagCond); free(tagCond);
@ -6153,43 +6134,25 @@ _over:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
*pQInfo = NULL; *pQInfo = NULL;
} else { } else {
SQInfo* pq = (SQInfo*) (*pQInfo); // SQInfo* pq = (SQInfo*) (*pQInfo);
T_REF_INC(pq); // T_REF_INC(pq);
T_REF_INC(pq); // T_REF_INC(pq);
} }
// if failed to add ref for all meters in this query, abort current query // if failed to add ref for all meters in this query, abort current query
return code; return code;
} }
static void doDestoryQueryInfo(SQInfo* pQInfo) {
assert(pQInfo != NULL);
qDebug("QInfo:%p query completed", pQInfo);
queryCostStatis(pQInfo); // print the query cost summary
freeQInfo(pQInfo);
}
void qDestroyQueryInfo(qinfo_t qHandle) { void qDestroyQueryInfo(qinfo_t qHandle) {
SQInfo* pQInfo = (SQInfo*) qHandle; SQInfo* pQInfo = (SQInfo*) qHandle;
if (!isValidQInfo(pQInfo)) { if (!isValidQInfo(pQInfo)) {
return; return;
} }
int32_t ref = T_REF_DEC(pQInfo); qDebug("QInfo:%p query completed", pQInfo);
qDebug("QInfo:%p dec refCount, value:%d", pQInfo, ref); queryCostStatis(pQInfo); // print the query cost summary
freeQInfo(pQInfo);
if (ref == 0) {
_qinfo_free_fn_t freeFp = pQInfo->freeFn;
void* param = pQInfo->param;
doDestoryQueryInfo(pQInfo);
if (freeFp != NULL) {
assert(param != NULL);
freeFp(param);
}
}
} }
void qTableQuery(qinfo_t qinfo) { void qTableQuery(qinfo_t qinfo) {
@ -6200,31 +6163,24 @@ void qTableQuery(qinfo_t qinfo) {
return; return;
} }
if (isQueryKilled(pQInfo)) { if (IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p it is already killed, abort", pQInfo); qDebug("QInfo:%p it is already killed, abort", pQInfo);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo);
return; return;
} }
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) { if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table exists for query, abort", pQInfo); qDebug("QInfo:%p no table exists for query, abort", pQInfo);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo);
return; return;
} }
int32_t ret = setjmp(pQInfo->runtimeEnv.env);
// error occurs, record the error code and return to client // error occurs, record the error code and return to client
int32_t ret = setjmp(pQInfo->runtimeEnv.env);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
pQInfo->code = ret; pQInfo->code = ret;
qDebug("QInfo:%p query abort due to error occurs, code:%s", pQInfo, tstrerror(pQInfo->code)); qDebug("QInfo:%p query abort due to error/cancel occurs, code:%s", pQInfo, tstrerror(pQInfo->code));
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo);
return; return;
} }
@ -6241,7 +6197,7 @@ void qTableQuery(qinfo_t qinfo) {
} }
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
if (isQueryKilled(pQInfo)) { if (IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p query is killed", pQInfo); qDebug("QInfo:%p query is killed", pQInfo);
} else if (pQuery->rec.rows == 0) { } else if (pQuery->rec.rows == 0) {
qDebug("QInfo:%p over, %zu tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total); qDebug("QInfo:%p over, %zu tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total);
@ -6251,7 +6207,6 @@ void qTableQuery(qinfo_t qinfo) {
} }
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo);
} }
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) {
@ -6262,7 +6217,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) {
} }
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if (isQueryKilled(pQInfo)) { if (IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); qDebug("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
return pQInfo->code; return pQInfo->code;
} }
@ -6295,7 +6250,7 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo) {
} }
if (ret) { if (ret) {
T_REF_INC(pQInfo); // T_REF_INC(pQInfo);
qDebug("QInfo:%p has more results waits for client retrieve", pQInfo); qDebug("QInfo:%p has more results waits for client retrieve", pQInfo);
} }
@ -6337,7 +6292,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
code = pQInfo->code; code = pQInfo->code;
} }
if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { if (IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
(*pRsp)->completed = 1; // notify no more result to client (*pRsp)->completed = 1; // notify no more result to client
} }
@ -6352,7 +6307,6 @@ int32_t qKillQuery(qinfo_t qinfo) {
} }
setQueryKilled(pQInfo); setQueryKilled(pQInfo);
qDestroyQueryInfo(pQInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -6497,6 +6451,7 @@ void freeqinfoFn(void *qhandle) {
} }
qKillQuery(*handle); qKillQuery(*handle);
qDestroyQueryInfo(*handle);
} }
void* qOpenQueryMgmt(int32_t vgId) { void* qOpenQueryMgmt(int32_t vgId) {
@ -6515,7 +6470,11 @@ void* qOpenQueryMgmt(int32_t vgId) {
return pQueryHandle; return pQueryHandle;
} }
void qSetQueryMgmtClosed(void* pQMgmt) { static void queryMgmtKillQueryFn(void* handle) {
qKillQuery(handle);
}
void qQueryMgmtNotifyClosed(void* pQMgmt) {
if (pQMgmt == NULL) { if (pQMgmt == NULL) {
return; return;
} }
@ -6527,7 +6486,7 @@ void qSetQueryMgmtClosed(void* pQMgmt) {
pQueryMgmt->closed = true; pQueryMgmt->closed = true;
pthread_mutex_unlock(&pQueryMgmt->lock); pthread_mutex_unlock(&pQueryMgmt->lock);
taosCacheRefresh(pQueryMgmt->qinfoPool, freeqinfoFn); taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn);
} }
void qCleanupQueryMgmt(void* pQMgmt) { void qCleanupQueryMgmt(void* pQMgmt) {

View File

@ -509,10 +509,11 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
for (i = 1; isdigit(z[i]); i++) { for (i = 1; isdigit(z[i]); i++) {
} }
/* here is the 1a/2s/3m/9y */ /* here is the 1u/1a/2s/3m/9y */
if ((z[i] == 'a' || z[i] == 's' || z[i] == 'm' || z[i] == 'h' || z[i] == 'd' || z[i] == 'n' || z[i] == 'y' || if ((z[i] == 'u' || z[i] == 'a' || z[i] == 's' || z[i] == 'm' || z[i] == 'h' || z[i] == 'd' || z[i] == 'n' ||
z[i] == 'w' || z[i] == 'A' || z[i] == 'S' || z[i] == 'M' || z[i] == 'H' || z[i] == 'D' || z[i] == 'N' || z[i] == 'y' || z[i] == 'w' ||
z[i] == 'Y' || z[i] == 'W') && z[i] == 'U' || z[i] == 'A' || z[i] == 'S' || z[i] == 'M' || z[i] == 'H' || z[i] == 'D' || z[i] == 'N' ||
z[i] == 'Y' || z[i] == 'W') &&
(isIdChar[(uint8_t)z[i + 1]] == 0)) { (isIdChar[(uint8_t)z[i + 1]] == 0)) {
*tokenType = TK_VARIABLE; *tokenType = TK_VARIABLE;
i += 1; i += 1;

View File

@ -79,7 +79,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
pTSBuf->numOfAlloc = header.numOfVnode; pTSBuf->numOfAlloc = header.numOfVnode;
STSVnodeBlockInfoEx* tmp = realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * pTSBuf->numOfAlloc); STSVnodeBlockInfoEx* tmp = realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * pTSBuf->numOfAlloc);
if (tmp == NULL) { if (tmp == NULL) {
tsBufDestory(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
@ -92,7 +92,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
pTSBuf->tsOrder = header.tsOrder; pTSBuf->tsOrder = header.tsOrder;
if (pTSBuf->tsOrder != TSDB_ORDER_ASC && pTSBuf->tsOrder != TSDB_ORDER_DESC) { if (pTSBuf->tsOrder != TSDB_ORDER_ASC && pTSBuf->tsOrder != TSDB_ORDER_DESC) {
// tscError("invalid order info in buf:%d", pTSBuf->tsOrder); // tscError("invalid order info in buf:%d", pTSBuf->tsOrder);
tsBufDestory(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
@ -100,7 +100,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize); STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize);
if (buf == NULL) { if (buf == NULL) {
tsBufDestory(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
@ -120,7 +120,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
struct stat fileStat; struct stat fileStat;
if (fstat(fileno(pTSBuf->f), &fileStat) != 0) { if (fstat(fileno(pTSBuf->f), &fileStat) != 0) {
tsBufDestory(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
@ -137,7 +137,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
return pTSBuf; return pTSBuf;
} }
void* tsBufDestory(STSBuf* pTSBuf) { void* tsBufDestroy(STSBuf* pTSBuf) {
if (pTSBuf == NULL) { if (pTSBuf == NULL) {
return NULL; return NULL;
} }
@ -920,13 +920,13 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
pTSBuf->numOfAlloc = INITIAL_VNODEINFO_SIZE; pTSBuf->numOfAlloc = INITIAL_VNODEINFO_SIZE;
pTSBuf->pData = calloc(pTSBuf->numOfAlloc, sizeof(STSVnodeBlockInfoEx)); pTSBuf->pData = calloc(pTSBuf->numOfAlloc, sizeof(STSVnodeBlockInfoEx));
if (pTSBuf->pData == NULL) { if (pTSBuf->pData == NULL) {
tsBufDestory(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
pTSBuf->tsData.rawBuf = malloc(MEM_BUF_SIZE); pTSBuf->tsData.rawBuf = malloc(MEM_BUF_SIZE);
if (pTSBuf->tsData.rawBuf == NULL) { if (pTSBuf->tsData.rawBuf == NULL) {
tsBufDestory(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
@ -936,13 +936,13 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
pTSBuf->assistBuf = malloc(MEM_BUF_SIZE); pTSBuf->assistBuf = malloc(MEM_BUF_SIZE);
if (pTSBuf->assistBuf == NULL) { if (pTSBuf->assistBuf == NULL) {
tsBufDestory(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }
pTSBuf->block.payload = malloc(MEM_BUF_SIZE); pTSBuf->block.payload = malloc(MEM_BUF_SIZE);
if (pTSBuf->block.payload == NULL) { if (pTSBuf->block.payload == NULL) {
tsBufDestory(pTSBuf); tsBufDestroy(pTSBuf);
return NULL; return NULL;
} }

View File

@ -47,7 +47,7 @@ void simpleTest() {
EXPECT_EQ(pTSBuf->tsData.len, 0); EXPECT_EQ(pTSBuf->tsData.len, 0);
EXPECT_EQ(pTSBuf->block.numOfElem, num); EXPECT_EQ(pTSBuf->block.numOfElem, num);
tsBufDestory(pTSBuf); tsBufDestroy(pTSBuf);
} }
// one large list of ts, the ts list need to be split into several small blocks // one large list of ts, the ts list need to be split into several small blocks
@ -71,7 +71,7 @@ void largeTSTest() {
EXPECT_EQ(pTSBuf->tsData.len, 0); EXPECT_EQ(pTSBuf->tsData.len, 0);
EXPECT_EQ(pTSBuf->block.numOfElem, num); EXPECT_EQ(pTSBuf->block.numOfElem, num);
tsBufDestory(pTSBuf); tsBufDestroy(pTSBuf);
} }
void multiTagsTest() { void multiTagsTest() {
@ -101,7 +101,7 @@ void multiTagsTest() {
EXPECT_EQ(pTSBuf->tsData.len, 0); EXPECT_EQ(pTSBuf->tsData.len, 0);
EXPECT_EQ(pTSBuf->block.numOfElem, num); EXPECT_EQ(pTSBuf->block.numOfElem, num);
tsBufDestory(pTSBuf); tsBufDestroy(pTSBuf);
} }
void multiVnodeTagsTest() { void multiVnodeTagsTest() {
@ -139,7 +139,7 @@ void multiVnodeTagsTest() {
EXPECT_EQ(pTSBuf->tsData.len, 0); EXPECT_EQ(pTSBuf->tsData.len, 0);
EXPECT_EQ(pTSBuf->block.numOfElem, num); EXPECT_EQ(pTSBuf->block.numOfElem, num);
tsBufDestory(pTSBuf); tsBufDestroy(pTSBuf);
} }
void loadDataTest() { void loadDataTest() {
@ -386,8 +386,8 @@ void mergeDiffVnodeBufferTest() {
tsBufDisplay(pTSBuf1); tsBufDisplay(pTSBuf1);
tsBufDestory(pTSBuf2); tsBufDestroy(pTSBuf2);
tsBufDestory(pTSBuf1); tsBufDestroy(pTSBuf1);
} }
void mergeIdenticalVnodeBufferTest() { void mergeIdenticalVnodeBufferTest() {
@ -432,8 +432,8 @@ void mergeIdenticalVnodeBufferTest() {
printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag, elem.ts); printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag, elem.ts);
} }
tsBufDestory(pTSBuf1); tsBufDestroy(pTSBuf1);
tsBufDestory(pTSBuf2); tsBufDestroy(pTSBuf2);
} }
} // namespace } // namespace

View File

@ -110,9 +110,10 @@ typedef struct STsdbQueryHandle {
SFileGroupIter fileIter; SFileGroupIter fileIter;
SRWHelper rhelper; SRWHelper rhelper;
STableBlockInfo* pDataBlockInfo; STableBlockInfo* pDataBlockInfo;
int32_t allocSize; // allocated data block size
SMemTable* mem; // mem-table SMemTable* mem; // mem-table
SMemTable* imem; // imem-table, acquired from snapshot SMemTable* imem; // imem-table, acquired from snapshot
SArray* defaultLoadColumn;// default load column
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */ SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
} STsdbQueryHandle; } STsdbQueryHandle;
@ -136,6 +137,34 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
pCompBlockLoadInfo->fileId = -1; pCompBlockLoadInfo->fileId = -1;
} }
static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) {
size_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
assert(numOfCols <= TSDB_MAX_COLUMNS);
SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
taosArrayPush(pIdList, &pCol->info.colId);
}
return pIdList;
}
static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS) {
SArray* pLocalIdList = getColumnIdList(pQueryHandle);
// check if the primary time stamp column needs to load
int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);
// the primary timestamp column does not be included in the the specified load column list, add it
if (loadTS && colId != 0) {
int16_t columnId = 0;
taosArrayInsert(pLocalIdList, 0, &columnId);
}
return pLocalIdList;
}
TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) { TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) {
STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
pQueryHandle->order = pCond->order; pQueryHandle->order = pCond->order;
@ -148,7 +177,8 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
pQueryHandle->activeIndex = 0; // current active table index pQueryHandle->activeIndex = 0; // current active table index
pQueryHandle->qinfo = qinfo; pQueryHandle->qinfo = qinfo;
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock; pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
pQueryHandle->allocSize = 0;
tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb); tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem); tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem);
@ -195,7 +225,9 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
taosArrayPush(pQueryHandle->pTableCheckInfo, &info); taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
} }
} }
pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);
tsdbDebug("%p total numOfTable:%zu in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo)); tsdbDebug("%p total numOfTable:%zu in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo));
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
@ -546,33 +578,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
.tid = (_checkInfo)->tableId.tid, \ .tid = (_checkInfo)->tableId.tid, \
.uid = (_checkInfo)->tableId.uid}) .uid = (_checkInfo)->tableId.uid})
static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) {
size_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
assert(numOfCols <= TSDB_MAX_COLUMNS);
SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
taosArrayPush(pIdList, &pCol->info.colId);
}
return pIdList;
}
static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS) {
SArray* pLocalIdList = getColumnIdList(pQueryHandle);
// check if the primary time stamp column needs to load
int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);
// the primary timestamp column does not be included in the the specified load column list, add it
if (loadTS && colId != 0) {
int16_t columnId = 0;
taosArrayInsert(pLocalIdList, 0, &columnId);
}
return pLocalIdList;
}
static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) { static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
STsdbRepo *pRepo = pQueryHandle->pTsdb; STsdbRepo *pRepo = pQueryHandle->pTsdb;
@ -584,8 +590,6 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
data->uid = pCheckInfo->pTableObj->tableId.uid; data->uid = pCheckInfo->pTableObj->tableId.uid;
bool blockLoaded = false; bool blockLoaded = false;
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
if (pCheckInfo->pDataCols == NULL) { if (pCheckInfo->pDataCols == NULL) {
@ -613,7 +617,6 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows); assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
pBlock->numOfRows = pCols->numOfRows; pBlock->numOfRows = pCols->numOfRows;
taosArrayDestroy(sa);
tfree(data); tfree(data);
int64_t et = taosGetTimestampUs() - st; int64_t et = taosGetTimestampUs() - st;
@ -656,12 +659,8 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
return; return;
} }
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn);
taosArrayDestroy(sa);
} else { } else {
/* /*
* no data in cache, only load data from file * no data in cache, only load data from file
@ -681,14 +680,12 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
} }
static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) { static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) { if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
// query ended in current block // query ended in current block
if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) { if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) { if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
taosArrayDestroy(sa);
return false; return false;
} }
@ -702,7 +699,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
cur->pos = 0; cur->pos = 0;
} }
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn);
} else { // the whole block is loaded in to buffer } else { // the whole block is loaded in to buffer
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
} }
@ -719,13 +716,12 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
cur->pos = pBlock->numOfRows - 1; cur->pos = pBlock->numOfRows - 1;
} }
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn);
} else { } else {
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
} }
} }
taosArrayDestroy(sa);
return pQueryHandle->realNumOfRows > 0; return pQueryHandle->realNumOfRows > 0;
} }
@ -1250,13 +1246,19 @@ static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void*
} }
static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) { static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) {
char* tmp = realloc(pQueryHandle->pDataBlockInfo, sizeof(STableBlockInfo) * numOfBlocks); size_t size = sizeof(STableBlockInfo) * numOfBlocks;
if (tmp == NULL) {
return TSDB_CODE_TDB_OUT_OF_MEMORY; if (pQueryHandle->allocSize < size) {
pQueryHandle->allocSize = size;
char* tmp = realloc(pQueryHandle->pDataBlockInfo, pQueryHandle->allocSize);
if (tmp == NULL) {
return TSDB_CODE_TDB_OUT_OF_MEMORY;
}
pQueryHandle->pDataBlockInfo = (STableBlockInfo*) tmp;
} }
pQueryHandle->pDataBlockInfo = (STableBlockInfo*) tmp; memset(pQueryHandle->pDataBlockInfo, 0, size);
memset(pQueryHandle->pDataBlockInfo, 0, sizeof(STableBlockInfo) * numOfBlocks);
*numOfAllocBlocks = numOfBlocks; *numOfAllocBlocks = numOfBlocks;
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
@ -1492,9 +1494,8 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
return false; return false;
} }
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
/*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle, &blockInfo); /*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle, &blockInfo);
/*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, sa); /*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, pQueryHandle->defaultLoadColumn);
if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) { if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) {
// data already retrieve, discard other data rows and return // data already retrieve, discard other data rows and return
@ -1508,7 +1509,6 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
pQueryHandle->window = pQueryHandle->cur.win; pQueryHandle->window = pQueryHandle->cur.win;
pQueryHandle->cur.rows = 1; pQueryHandle->cur.rows = 1;
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL; pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
taosArrayDestroy(sa);
return true; return true;
} else { } else {
STsdbQueryHandle* pSecQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); STsdbQueryHandle* pSecQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
@ -1565,7 +1565,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
assert(ret); assert(ret);
/*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo); /*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo);
/*SArray *pDataBlock = */tsdbRetrieveDataBlock((void*) pSecQueryHandle, sa); /*SArray *pDataBlock = */tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
@ -2333,6 +2333,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
} }
taosArrayDestroy(pQueryHandle->pColumns); taosArrayDestroy(pQueryHandle->pColumns);
taosArrayDestroy(pQueryHandle->defaultLoadColumn);
tfree(pQueryHandle->pDataBlockInfo); tfree(pQueryHandle->pDataBlockInfo);
tfree(pQueryHandle->statis); tfree(pQueryHandle->statis);

View File

@ -23,7 +23,7 @@ extern "C" {
#include "os.h" #include "os.h"
#define TARRAY_MIN_SIZE 8 #define TARRAY_MIN_SIZE 8
#define TARRAY_GET_ELEM(array, index) ((array)->pData + (index) * (array)->elemSize) #define TARRAY_GET_ELEM(array, index) ((void*)((array)->pData + (index) * (array)->elemSize))
typedef struct SArray { typedef struct SArray {
size_t size; size_t size;

View File

@ -33,17 +33,20 @@ typedef struct SCacheStatis {
int64_t refreshCount; int64_t refreshCount;
} SCacheStatis; } SCacheStatis;
struct STrashElem;
typedef struct SCacheDataNode { typedef struct SCacheDataNode {
uint64_t addedTime; // the added time when this element is added or updated into cache uint64_t addedTime; // the added time when this element is added or updated into cache
uint64_t lifespan; // expiredTime expiredTime when this element should be remove from cache uint64_t lifespan; // life duration when this element should be remove from cache
uint64_t signature; uint64_t expireTime; // expire time
uint32_t size; // allocated size for current SCacheDataNode uint64_t signature;
struct STrashElem *pTNodeHeader; // point to trash node head
uint16_t keySize: 15; // max key size: 32kb
bool inTrashCan: 1;// denote if it is in trash or not
uint32_t size; // allocated size for current SCacheDataNode
T_REF_DECLARE() T_REF_DECLARE()
uint16_t keySize: 15; // max key size: 32kb char *key;
bool inTrashCan: 1;// denote if it is in trash or not char data[];
int32_t extendFactor; // number of life span extend
char *key;
char data[];
} SCacheDataNode; } SCacheDataNode;
typedef struct STrashElem { typedef struct STrashElem {

View File

@ -116,11 +116,13 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
return; return;
} }
int32_t size = pNode->size;
taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize); taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
pCacheObj->totalSize -= pNode->size;
uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes", uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes",
pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, size); pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize,
pNode->size);
if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data); if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
free(pNode); free(pNode);
} }
@ -285,7 +287,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64 uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64
"bytes size:%" PRId64 "bytes", "bytes size:%" PRId64 "bytes",
pCacheObj->name, key, pNode->data, pNode->addedTime, (pNode->lifespan * pNode->extendFactor + pNode->addedTime), pCacheObj->name, key, pNode->data, pNode->addedTime, pNode->expireTime,
(int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, dataSize); (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, dataSize);
} else { } else {
uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key); uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
@ -312,16 +314,6 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
int32_t ref = 0; int32_t ref = 0;
if (ptNode != NULL) { if (ptNode != NULL) {
ref = T_REF_INC(*ptNode); ref = T_REF_INC(*ptNode);
// if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan
if (pCacheObj->extendLifespan) {
int64_t now = taosGetTimestampMs();
if ((now - (*ptNode)->addedTime) < (*ptNode)->lifespan * (*ptNode)->extendFactor) {
(*ptNode)->extendFactor += 1;
uDebug("key:%p extend life time to %"PRId64, key, (*ptNode)->lifespan * (*ptNode)->extendFactor + (*ptNode)->addedTime);
}
}
} }
__cache_unlock(pCacheObj); __cache_unlock(pCacheObj);
@ -347,8 +339,7 @@ void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t ke
SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen); SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
if (ptNode != NULL) { if (ptNode != NULL) {
T_REF_INC(*ptNode); T_REF_INC(*ptNode);
(*ptNode)->extendFactor += 1; (*ptNode)->expireTime = taosGetTimestampMs() + (*ptNode)->lifespan;
// (*ptNode)->lifespan = expireTime;
} }
__cache_unlock(pCacheObj); __cache_unlock(pCacheObj);
@ -380,17 +371,6 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
int32_t ref = T_REF_INC(ptNode); int32_t ref = T_REF_INC(ptNode);
uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref); uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
// if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan
if (pCacheObj->extendLifespan) {
int64_t now = taosGetTimestampMs();
if ((now - ptNode->addedTime) < ptNode->lifespan * ptNode->extendFactor) {
ptNode->extendFactor += 1;
uDebug("cache:%s, %p extend life time to %" PRId64, pCacheObj->name, ptNode->data,
ptNode->lifespan * ptNode->extendFactor + ptNode->addedTime);
}
}
// the data if referenced by at least one object, so the reference count must be greater than the value of 2. // the data if referenced by at least one object, so the reference count must be greater than the value of 2.
assert(ref >= 2); assert(ref >= 2);
return data; return data;
@ -431,22 +411,58 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
} }
*data = NULL; *data = NULL;
int16_t ref = T_REF_DEC(pNode);
uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref);
if (_remove && (!pNode->inTrashCan)) { // note: extend lifespan before dec ref count
__cache_wr_lock(pCacheObj); if (pCacheObj->extendLifespan) {
atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
uDebug("cache:%s data:%p extend life time to %"PRId64 " before release", pCacheObj->name, pNode->data, pNode->expireTime);
}
if (T_REF_VAL_GET(pNode) == 0) { bool inTrashCan = pNode->inTrashCan;
// remove directly, if not referenced by other users uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, T_REF_VAL_GET(pNode) - 1);
taosCacheReleaseNode(pCacheObj, pNode);
} else { // NOTE: once refcount is decrease, pNode may be free by other thread immediately.
// pNode may be released immediately by other thread after the reference count of pNode is set to 0, int32_t ref = T_REF_DEC(pNode);
// So we need to lock it in the first place.
taosCacheMoveToTrash(pCacheObj, pNode); if (inTrashCan) {
// Remove it if the ref count is 0.
// The ref count does not need to load and check again after lock acquired, since ref count can not be increased when
// the node is in trashcan.
if (ref == 0) {
__cache_wr_lock(pCacheObj);
assert(pNode->pTNodeHeader->pData == pNode);
taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader);
__cache_unlock(pCacheObj);
} }
__cache_unlock(pCacheObj); } else {
assert(pNode->pTNodeHeader == NULL);
if (_remove) { // not in trash can, but need to remove it
__cache_wr_lock(pCacheObj);
/*
* If not referenced by other users. Otherwise move this node to trashcan wait for all users
* releasing this resources.
*
* NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
* that tries to do the same thing.
*/
if (ref == 0) {
if (T_REF_VAL_GET(pNode) == 0) {
taosCacheReleaseNode(pCacheObj, pNode);
} else {
taosCacheMoveToTrash(pCacheObj, pNode);
}
}
__cache_unlock(pCacheObj);
// } else { // extend its life time
// if (pCacheObj->extendLifespan) {
// atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
// uDebug("cache:%s data:%p extend life time to %"PRId64 " after release", pCacheObj->name, pNode->data, pNode->expireTime);
// }
}
} }
} }
@ -486,7 +502,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
uint64_t duration) { uint64_t duration) {
size_t totalSize = size + sizeof(SCacheDataNode) + keyLen + 1; size_t totalSize = size + sizeof(SCacheDataNode) + keyLen;
SCacheDataNode *pNewNode = calloc(1, totalSize); SCacheDataNode *pNewNode = calloc(1, totalSize);
if (pNewNode == NULL) { if (pNewNode == NULL) {
@ -503,7 +519,7 @@ SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *
pNewNode->addedTime = (uint64_t)taosGetTimestampMs(); pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
pNewNode->lifespan = duration; pNewNode->lifespan = duration;
pNewNode->extendFactor = 1; pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan;
pNewNode->signature = (uint64_t)pNewNode; pNewNode->signature = (uint64_t)pNewNode;
pNewNode->size = (uint32_t)totalSize; pNewNode->size = (uint32_t)totalSize;
@ -512,6 +528,7 @@ SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *
void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) { void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
if (pNode->inTrashCan) { /* node is already in trash */ if (pNode->inTrashCan) { /* node is already in trash */
assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
return; return;
} }
@ -527,6 +544,7 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
pCacheObj->pTrash = pElem; pCacheObj->pTrash = pElem;
pNode->inTrashCan = true; pNode->inTrashCan = true;
pNode->pTNodeHeader = pElem;
pCacheObj->numOfElemsInTrash++; pCacheObj->numOfElemsInTrash++;
uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode->data, pCacheObj->numOfElemsInTrash); uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode->data, pCacheObj->numOfElemsInTrash);
@ -629,7 +647,7 @@ static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t
__cache_wr_lock(pCacheObj); __cache_wr_lock(pCacheObj);
while (taosHashIterNext(pIter)) { while (taosHashIterNext(pIter)) {
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
if ((pNode->addedTime + pNode->lifespan * pNode->extendFactor) <= time && T_REF_VAL_GET(pNode) <= 0) { if (pNode->expireTime < time && T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode(pCacheObj, pNode); taosCacheReleaseNode(pCacheObj, pNode);
continue; continue;
} }

View File

@ -317,29 +317,34 @@ int32_t parseLocaltimeWithDst(char* timestr, int64_t* time, int32_t timePrec) {
static int32_t getTimestampInUsFromStrImpl(int64_t val, char unit, int64_t* result) { static int32_t getTimestampInUsFromStrImpl(int64_t val, char unit, int64_t* result) {
*result = val; *result = val;
int64_t factor = 1000L;
switch (unit) { switch (unit) {
case 's': case 's':
(*result) *= MILLISECOND_PER_SECOND; (*result) *= MILLISECOND_PER_SECOND*factor;
break; break;
case 'm': case 'm':
(*result) *= MILLISECOND_PER_MINUTE; (*result) *= MILLISECOND_PER_MINUTE*factor;
break; break;
case 'h': case 'h':
(*result) *= MILLISECOND_PER_HOUR; (*result) *= MILLISECOND_PER_HOUR*factor;
break; break;
case 'd': case 'd':
(*result) *= MILLISECOND_PER_DAY; (*result) *= MILLISECOND_PER_DAY*factor;
break; break;
case 'w': case 'w':
(*result) *= MILLISECOND_PER_WEEK; (*result) *= MILLISECOND_PER_WEEK*factor;
break; break;
case 'n': case 'n':
(*result) *= MILLISECOND_PER_MONTH; (*result) *= MILLISECOND_PER_MONTH*factor;
break; break;
case 'y': case 'y':
(*result) *= MILLISECOND_PER_YEAR; (*result) *= MILLISECOND_PER_YEAR*factor;
break; break;
case 'a': case 'a':
(*result) *= factor;
break;
case 'u':
break; break;
default: { default: {
; ;
@ -348,7 +353,6 @@ static int32_t getTimestampInUsFromStrImpl(int64_t val, char unit, int64_t* resu
} }
/* get the value in microsecond */ /* get the value in microsecond */
(*result) *= 1000L;
return 0; return 0;
} }

View File

@ -508,7 +508,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount); vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount);
// release local resources only after cutting off outside connections // release local resources only after cutting off outside connections
qSetQueryMgmtClosed(pVnode->qMgmt); qQueryMgmtNotifyClosed(pVnode->qMgmt);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }

View File

@ -82,6 +82,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
} else { } else {
assert(*qhandle == (void*) killQueryMsg->qhandle); assert(*qhandle == (void*) killQueryMsg->qhandle);
qKillQuery(*qhandle);
qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true); qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true);
} }
@ -93,7 +94,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (contLen != 0) { if (contLen != 0) {
qinfo_t pQInfo = NULL; qinfo_t pQInfo = NULL;
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, NULL, &pQInfo); code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code; pRsp->code = code;
@ -108,9 +109,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo); handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo);
if (handle == NULL) { // failed to register qhandle if (handle == NULL) { // failed to register qhandle
pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE; pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
qDestroyQueryInfo(pQInfo); // destroy it directly
qKillQuery(pQInfo);
qKillQuery(pQInfo);
} else { } else {
assert(*handle == pQInfo); assert(*handle == pQInfo);
pRsp->qhandle = htobe64((uint64_t) pQInfo); pRsp->qhandle = htobe64((uint64_t) pQInfo);
@ -120,10 +119,6 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
// NOTE: there two refcount, needs to kill twice
// query has not been put into qhandle pool, kill it directly.
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
return pRsp->code; return pRsp->code;
} }
@ -134,6 +129,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
dnodePutItemIntoReadQueue(pVnode, *handle); dnodePutItemIntoReadQueue(pVnode, *handle);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
} }
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", vgId, pQInfo); vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", vgId, pQInfo);
} else { } else {
assert(pCont != NULL); assert(pCont != NULL);
@ -183,6 +179,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (pRetrieve->free == 1) { if (pRetrieve->free == 1) {
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
@ -209,6 +206,9 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
dnodePutItemIntoReadQueue(pVnode, *handle); dnodePutItemIntoReadQueue(pVnode, *handle);
pRet->qhandle = *handle; pRet->qhandle = *handle;
freeHandle = false; freeHandle = false;
} else {
qKillQuery(*handle);
freeHandle = true;
} }
} }
} }

View File

@ -37,17 +37,8 @@ class TDTestCase:
except Exception as e: except Exception as e:
tdLog.exit(e) tdLog.exit(e)
try: tdSql.error("select * from db.st")
tdSql.execute("select * from db.st") tdSql.error("select * from db.tb")
except Exception as e:
if e.args[0] != 'mnode invalid table name':
tdLog.exit(e)
try:
tdSql.execute("select * from db.tb")
except Exception as e:
if e.args[0] != 'mnode invalid table name':
tdLog.exit(e)
def stop(self): def stop(self):
tdSql.close() tdSql.close()

View File

@ -652,25 +652,25 @@ endi
if $data01 != 1 then if $data01 != 1 then
return -1 return -1
endi endi
if $data11 != null then if $data11 != NULL then
return -1 return -1
endi endi
if $data21 != 1 then if $data21 != 1 then
return -1 return -1
endi endi
if $data31 != null then if $data31 != NULL then
return -1 return -1
endi endi
if $data41 != 1 then if $data41 != 1 then
return -1 return -1
endi endi
if $data51 != null then if $data51 != NULL then
return -1 return -1
endi endi
if $data61 != 1 then if $data61 != 1 then
return -1 return -1
endi endi
if $data71 != null then if $data71 != NULL then
return -1 return -1
endi endi
if $data81 != 1 then if $data81 != 1 then
@ -689,25 +689,25 @@ endi
if $data01 != 0.000000000 then if $data01 != 0.000000000 then
return -1 return -1
endi endi
if $data11 != null then if $data11 != NULL then
return -1 return -1
endi endi
if $data21 != 1.000000000 then if $data21 != 1.000000000 then
return -1 return -1
endi endi
if $data31 != null then if $data31 != NULL then
return -1 return -1
endi endi
if $data41 != 2.000000000 then if $data41 != 2.000000000 then
return -1 return -1
endi endi
if $data51 != null then if $data51 != NULL then
return -1 return -1
endi endi
if $data61 != 3.000000000 then if $data61 != 3.000000000 then
return -1 return -1
endi endi
if $data71 != null then if $data71 != NULL then
return -1 return -1
endi endi
if $data81 != 4.000000000 then if $data81 != 4.000000000 then
@ -722,25 +722,25 @@ endi
if $data01 != 0 then if $data01 != 0 then
return -1 return -1
endi endi
if $data11 != null then if $data11 != NULL then
return -1 return -1
endi endi
if $data21 != 1 then if $data21 != 1 then
return -1 return -1
endi endi
if $data31 != null then if $data31 != NULL then
return -1 return -1
endi endi
if $data41 != 2 then if $data41 != 2 then
return -1 return -1
endi endi
if $data51 != null then if $data51 != NULL then
return -1 return -1
endi endi
if $data61 != 3 then if $data61 != 3 then
return -1 return -1
endi endi
if $data71 != null then if $data71 != NULL then
return -1 return -1
endi endi
if $data81 != 4 then if $data81 != 4 then
@ -755,25 +755,25 @@ endi
if $data01 != 0 then if $data01 != 0 then
return -1 return -1
endi endi
if $data11 != null then if $data11 != NULL then
return -1 return -1
endi endi
if $data21 != 1 then if $data21 != 1 then
return -1 return -1
endi endi
if $data31 != null then if $data31 != NULL then
return -1 return -1
endi endi
if $data41 != 2 then if $data41 != 2 then
return -1 return -1
endi endi
if $data51 != null then if $data51 != NULL then
return -1 return -1
endi endi
if $data61 != 3 then if $data61 != 3 then
return -1 return -1
endi endi
if $data71 != null then if $data71 != NULL then
return -1 return -1
endi endi
if $data81 != 4 then if $data81 != 4 then
@ -788,25 +788,25 @@ endi
if $data01 != 0 then if $data01 != 0 then
return -1 return -1
endi endi
if $data11 != null then if $data11 != NULL then
return -1 return -1
endi endi
if $data21 != 1 then if $data21 != 1 then
return -1 return -1
endi endi
if $data31 != null then if $data31 != NULL then
return -1 return -1
endi endi
if $data41 != 2 then if $data41 != 2 then
return -1 return -1
endi endi
if $data51 != null then if $data51 != NULL then
return -1 return -1
endi endi
if $data61 != 3 then if $data61 != 3 then
return -1 return -1
endi endi
if $data71 != null then if $data71 != NULL then
return -1 return -1
endi endi
if $data81 != 4 then if $data81 != 4 then
@ -821,25 +821,25 @@ endi
if $data01 != 0 then if $data01 != 0 then
return -1 return -1
endi endi
if $data11 != null then if $data11 != NULL then
return -1 return -1
endi endi
if $data21 != 1 then if $data21 != 1 then
return -1 return -1
endi endi
if $data31 != null then if $data31 != NULL then
return -1 return -1
endi endi
if $data41 != 2 then if $data41 != 2 then
return -1 return -1
endi endi
if $data51 != null then if $data51 != NULL then
return -1 return -1
endi endi
if $data61 != 3 then if $data61 != 3 then
return -1 return -1
endi endi
if $data71 != null then if $data71 != NULL then
return -1 return -1
endi endi
if $data81 != 4 then if $data81 != 4 then