diff --git a/.gitignore b/.gitignore index ba8611030b..f2c1cb75b3 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ rpms/ mac/ *.pyc *.tmp +*.swp src/connector/nodejs/node_modules/ src/connector/nodejs/out/ tests/test/ diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 9cb43a9f36..054b2894c5 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -357,7 +357,6 @@ typedef struct SSqlObj { char freed : 4; char listed : 4; tsem_t rspSem; - tsem_t emptyRspSem; SSqlCmd cmd; SSqlRes res; uint8_t numOfSubs; @@ -409,7 +408,7 @@ int tscProcessSql(SSqlObj *pSql); int tscRenewMeterMeta(SSqlObj *pSql, char *tableId); void tscQueueAsyncRes(SSqlObj *pSql); -void tscQueueAsyncError(void(*fp), void *param); +void tscQueueAsyncError(void(*fp), void *param, int32_t code); int tscProcessLocalCmd(SSqlObj *pSql); int tscCfgDynamicOptions(char *msg); @@ -450,7 +449,7 @@ void tscFreeSqlObj(SSqlObj *pObj); void tscCloseTscObj(STscObj *pObj); -void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen); +void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen); void tscProcessMultiVnodesInsert(SSqlObj *pSql); void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index c1738e6801..10878ee37f 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -40,7 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows); -void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen) { +void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; @@ -51,17 +51,15 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { tscError("failed to malloc payload"); - tfree(pSql); - tscQueueAsyncError(fp, param); + tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY); return; } - pSql->sqlstr = malloc(sqlLen + 1); + pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1); if (pSql->sqlstr == NULL) { tscError("%p failed to malloc sql string buffer", pSql); - tscQueueAsyncError(fp, param); + tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY); free(pCmd->payload); - free(pSql); return; } @@ -75,7 +73,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code != TSDB_CODE_SUCCESS) { - pSql->res.code = (uint8_t)code; + pSql->res.code = code; tscQueueAsyncRes(pSql); return; } @@ -88,15 +86,16 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) { tscError("bug!!! pObj:%p", pObj); - globalCode = TSDB_CODE_DISCONNECTED; - tscQueueAsyncError(fp, param); + terrno = TSDB_CODE_DISCONNECTED; + tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED); return; } int32_t sqlLen = strlen(sqlstr); if (sqlLen > tsMaxSQLStringLen) { tscError("sql string too long"); - tscQueueAsyncError(fp, param); + terrno = TSDB_CODE_INVALID_SQL; + tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_SQL); return; } @@ -105,7 +104,8 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); if (pSql == NULL) { tscError("failed to malloc sqlObj"); - tscQueueAsyncError(fp, param); + terrno = TSDB_CODE_CLI_OUT_OF_MEMORY; + tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY); return; } @@ -170,7 +170,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo pRes->code = numOfRows; } - tscQueueAsyncError(pSql->fetchFp, param); + tscQueueAsyncError(pSql->fetchFp, param, pRes->code); return; } @@ -200,8 +200,8 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi SSqlObj *pSql = (SSqlObj *)taosa; if (pSql == NULL || pSql->signature != pSql) { tscError("sql object is NULL"); - globalCode = TSDB_CODE_DISCONNECTED; - tscQueueAsyncError(fp, param); +// globalCode = TSDB_CODE_DISCONNECTED; + tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED); return; } @@ -210,7 +210,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi if (pRes->qhandle == 0) { tscError("qhandle is NULL"); - tscQueueAsyncError(fp, param); + tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_QHANDLE); return; } @@ -232,8 +232,8 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), SSqlObj *pSql = (SSqlObj *)taosa; if (pSql == NULL || pSql->signature != pSql) { tscError("sql object is NULL"); - globalCode = TSDB_CODE_DISCONNECTED; - tscQueueAsyncError(fp, param); +// globalCode = TSDB_CODE_DISCONNECTED; + tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED); return; } @@ -242,7 +242,7 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), if (pRes->qhandle == 0) { tscError("qhandle is NULL"); - tscQueueAsyncError(fp, param); + tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_QHANDLE); return; } @@ -331,7 +331,7 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) { // pCmd may be released, so cache pCmd->command int cmd = pCmd->command; - int code = pRes->code ? -pRes->code : pRes->numOfRows; + int code = pRes->code;// ? -pRes->code : pRes->numOfRows; // in case of async insert, restore the user specified callback function bool shouldFree = tscShouldFreeAsyncSqlObj(pSql); @@ -349,18 +349,20 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) { } } -void tscProcessAsyncError(SSchedMsg *pMsg) { +static void tscProcessAsyncError(SSchedMsg *pMsg) { void (*fp)() = pMsg->ahandle; - - (*fp)(pMsg->thandle, NULL, -1); + (*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg); } -void tscQueueAsyncError(void(*fp), void *param) { +void tscQueueAsyncError(void(*fp), void *param, int32_t code) { + int32_t* c = malloc(sizeof(int32_t)); + *c = code; + SSchedMsg schedMsg; schedMsg.fp = tscProcessAsyncError; schedMsg.ahandle = fp; schedMsg.thandle = param; - schedMsg.msg = NULL; + schedMsg.msg = c; taosScheduleTask(tscQhandle, &schedMsg); } @@ -412,7 +414,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if (code != 0) { pRes->code = code; tscTrace("%p failed to renew tableMeta", pSql); - tsem_post(&pSql->rspSem); +// tsem_post(&pSql->rspSem); } else { tscTrace("%p renew tableMeta successfully, command:%d, code:%d, retry:%d", pSql, pSql->cmd.command, pSql->res.code, pSql->retry); @@ -424,7 +426,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { code = tscSendMsgToServer(pSql); if (code != 0) { pRes->code = code; - tsem_post(&pSql->rspSem); +// tsem_post(&pSql->rspSem); } } diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 241f24a747..8b065fcf51 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -488,7 +488,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { } tsem_init(&pSql->rspSem, 0, 0); - tsem_init(&pSql->emptyRspSem, 0, 1); pSql->signature = pSql; pSql->pTscObj = pObj; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 4db33b0291..5696611387 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -117,7 +117,7 @@ static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index); -static int32_t tSQLBinaryExprCreateFromSqlExpr(tSQLSyntaxNode **pExpr, tSQLExpr* pAst, int32_t* num, +static int32_t convertSyntaxTreeToExprTree(tExprNode **pExpr, tSQLExpr* pAst, int32_t* num, SColIndexEx** pColIndex, SSqlExprInfo* pExprInfo); /* @@ -215,7 +215,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { if (pQueryInfo->numOfTables == 0) { pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo); } else { - pTableMetaInfo = &pQueryInfo->pTableMetaInfo[0]; + pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; } pCmd->command = pInfo->type; @@ -1208,12 +1208,12 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel SSqlBinaryExprInfo* pBinExprInfo = &pFuncExpr->binExprInfo; - tSQLSyntaxNode* pNode = NULL; + tExprNode* pNode = NULL; SColIndexEx* pColIndex = NULL; - int32_t ret = tSQLBinaryExprCreateFromSqlExpr(&pNode, pItem->pNode, &pBinExprInfo->numOfCols, &pColIndex, &pQueryInfo->exprsInfo); + int32_t ret = convertSyntaxTreeToExprTree(&pNode, pItem->pNode, &pBinExprInfo->numOfCols, &pColIndex, &pQueryInfo->exprsInfo); if (ret != TSDB_CODE_SUCCESS) { - tSQLBinaryExprDestroy(&pNode, NULL); + tExprTreeDestroy(&pNode, NULL); return invalidSqlErrMsg(pQueryInfo->msg, "invalid expression in select clause"); } @@ -5807,20 +5807,20 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { return TSDB_CODE_SUCCESS; // Does not build query message here } -static int32_t tSQLBinaryExprCreateFromSqlExpr(tSQLSyntaxNode **pExpr, tSQLExpr* pAst, int32_t* num, +static int32_t convertSyntaxTreeToExprTree(tExprNode **pExpr, tSQLExpr* pAst, int32_t* num, SColIndexEx** pColIndex, SSqlExprInfo* pExprInfo) { - tSQLSyntaxNode* pLeft = NULL; - tSQLSyntaxNode* pRight= NULL; + tExprNode* pLeft = NULL; + tExprNode* pRight= NULL; if (pAst->pLeft != NULL) { - int32_t ret = tSQLBinaryExprCreateFromSqlExpr(&pLeft, pAst->pLeft, num, pColIndex, pExprInfo); + int32_t ret = convertSyntaxTreeToExprTree(&pLeft, pAst->pLeft, num, pColIndex, pExprInfo); if (ret != TSDB_CODE_SUCCESS) { return ret; } } if (pAst->pRight != NULL) { - int32_t ret = tSQLBinaryExprCreateFromSqlExpr(&pRight, pAst->pRight, num, pColIndex, pExprInfo); + int32_t ret = convertSyntaxTreeToExprTree(&pRight, pAst->pRight, num, pColIndex, pExprInfo); if (ret != TSDB_CODE_SUCCESS) { return ret; } @@ -5828,14 +5828,14 @@ static int32_t tSQLBinaryExprCreateFromSqlExpr(tSQLSyntaxNode **pExpr, tSQLExpr* if (pAst->pLeft == NULL) { if (pAst->nSQLOptr >= TK_TINYINT && pAst->nSQLOptr <= TK_DOUBLE) { - *pExpr = calloc(1, sizeof(tSQLSyntaxNode) + sizeof(tVariant)); + *pExpr = calloc(1, sizeof(tExprNode) + sizeof(tVariant)); (*pExpr)->nodeType = TSQL_NODE_VALUE; - (*pExpr)->pVal = (tVariant*) ((char*)(*pExpr) + sizeof(tSQLSyntaxNode)); + (*pExpr)->pVal = (tVariant*) ((char*)(*pExpr) + sizeof(tExprNode)); tVariantAssign((*pExpr)->pVal, &pAst->val); } else if (pAst->nSQLOptr >= TK_COUNT && pAst->nSQLOptr <= TK_AVG_IRATE) { - *pExpr = calloc(1, sizeof(tSQLSyntaxNode) + sizeof(SSchemaEx)); + *pExpr = calloc(1, sizeof(tExprNode) + sizeof(SSchemaEx)); (*pExpr)->nodeType = TSQL_NODE_COL; - (*pExpr)->pSchema = (SSchema*)((char*)(*pExpr) + sizeof(tSQLSyntaxNode)); + (*pExpr)->pSchema = (SSchema*)((char*)(*pExpr) + sizeof(tExprNode)); strncpy((*pExpr)->pSchema->name, pAst->operand.z, pAst->operand.n); // set the input column data byte and type. @@ -5855,7 +5855,7 @@ static int32_t tSQLBinaryExprCreateFromSqlExpr(tSQLSyntaxNode **pExpr, tSQLExpr* strncpy((*pColIndex)[(*num) - 1].name, pAst->operand.z, pAst->operand.n); } else { - *pExpr = (tSQLSyntaxNode *)calloc(1, sizeof(tSQLSyntaxNode)); + *pExpr = (tExprNode *)calloc(1, sizeof(tExprNode)); (*pExpr)->_node.hasPK = false; (*pExpr)->_node.pLeft = pLeft; (*pExpr)->_node.pRight = pRight; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e57f9cc410..f651c35324 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -285,14 +285,16 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { pRes->rspType = rpcMsg->msgType; pRes->rspLen = rpcMsg->contLen; - char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen); - if (tmp == NULL) { - pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; - } else { - pRes->pRsp = tmp; - if (pRes->rspLen) { + if (pRes->rspLen > 0) { + char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen); + if (tmp == NULL) { + pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; + } else { + pRes->pRsp = tmp; memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen); } + } else { + pRes->pRsp = NULL; } // ignore the error information returned from mnode when set ignore flag in sql @@ -327,7 +329,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL; rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows; - tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), taosres); + tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql); /* * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index bbf46a2353..f33e589c82 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -155,6 +155,10 @@ static void syncConnCallback(void *param, TAOS_RES *tres, int code) { STscObj *pObj = (STscObj *)param; assert(pObj != NULL && pObj->pSql != NULL); + if (code < 0) { + pObj->pSql->res.code = code; + } + sem_post(&pObj->pSql->rspSem); } @@ -177,17 +181,17 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha sem_wait(&pSql->rspSem); if (pSql->res.code != TSDB_CODE_SUCCESS) { + terrno = pSql->res.code; taos_close(pObj); return NULL; } tscTrace("%p DB connection is opening", pObj); - + // version compare only requires the first 3 segments of the version string int code = taosCheckVersion(version, taos_get_server_info(pObj), 3); if (code != 0) { - pSql->res.code = code; - + terrno = code; taos_close(pObj); return NULL; } else { @@ -267,31 +271,29 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { return pRes->code; } -static void syncQueryCallback(void *param, TAOS_RES *tres, int code) { - STscObj *pObj = (STscObj *)param; - assert(pObj != NULL && pObj->pSql != NULL); +static void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { + assert(param != NULL); + SSqlObj *pSql = ((STscObj *)param)->pSql; - sem_post(&pObj->pSql->rspSem); + // valid error code is less than 0 + if (code < 0) { + pSql->res.code = code; + } + + sem_post(&pSql->rspSem); } int taos_query(TAOS *taos, const char *sqlstr) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) { - globalCode = TSDB_CODE_DISCONNECTED; + terrno = TSDB_CODE_DISCONNECTED; return TSDB_CODE_DISCONNECTED; } - SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); - if (pSql == NULL) { - tscError("failed to malloc sqlObj"); - return TSDB_CODE_CLI_OUT_OF_MEMORY; - } + SSqlObj* pSql = pObj->pSql; - pObj->pSql = pSql; - tsem_init(&pSql->rspSem, 0, 0); - - int32_t sqlLen = strlen(sqlstr); - doAsyncQuery(pObj, pObj->pSql, syncQueryCallback, taos, sqlstr, sqlLen); + size_t sqlLen = strlen(sqlstr); + doAsyncQuery(pObj, pObj->pSql, waitForQueryRsp, taos, sqlstr, sqlLen); // wait for the callback function to post the semaphore sem_wait(&pSql->rspSem); @@ -649,12 +651,12 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { return pRes->tsrow; } -static void asyncFetchCallback(void *param, TAOS_RES *tres, int numOfRows) { +static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) { SSqlObj* pSql = (SSqlObj*) tres; + if (numOfRows < 0) { // set the error code pSql->res.code = -numOfRows; } - sem_post(&pSql->rspSem); } @@ -677,7 +679,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { // current data are exhausted, fetch more data if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true && (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) { - taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj); + taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj); sem_wait(&pSql->rspSem); } @@ -754,20 +756,18 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { if (pRes == NULL || pRes->qhandle == 0) { /* Query rsp is not received from vnode, so the qhandle is NULL */ tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp); - if (pSql->fp != NULL) { - STscObj* pObj = pSql->pTscObj; - - if (pSql == pObj->pSql) { - pObj->pSql = NULL; - tscFreeSqlObj(pSql); - } - + + if (tscShouldFreeAsyncSqlObj(pSql)) { + tscFreeSqlObj(pSql); tscTrace("%p Async SqlObj is freed by app", pSql); - } else if (keepCmd) { - tscFreeSqlResult(pSql); } else { - tscFreeSqlObjPartial(pSql); + if (keepCmd) { + tscFreeSqlResult(pSql); + } else { + tscFreeSqlObjPartial(pSql); + } } + return; } @@ -793,7 +793,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { * be executed, and the retry efforts may result in double free the resources, e.g.,SRetrieveSupport */ if (pRes->code != TSDB_CODE_QUERY_CANCELLED && - ((pRes->numOfRows > 0 && pCmd->command < TSDB_SQL_LOCAL) || + ((pRes->numOfRows > 0 && pCmd->command < TSDB_SQL_LOCAL && pRes->completed == false) || (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows == 0 && pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL))) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; @@ -836,39 +836,37 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { } } else { // if no free resource msg is sent to vnode, we free this object immediately. - - if (pSql->fp) { + bool free = tscShouldFreeAsyncSqlObj(pSql); + if (free) { assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL)); + tscFreeSqlObj(pSql); tscTrace("%p Async sql result is freed by app", pSql); - } else if (keepCmd) { - tscFreeSqlResult(pSql); - tscTrace("%p sql result is freed while sql command is kept", pSql); } else { - tscFreeSqlObjPartial(pSql); - tscTrace("%p sql result is freed", pSql); + if (keepCmd) { + tscFreeSqlResult(pSql); + tscTrace("%p sql result is freed while sql command is kept", pSql); + } else { + tscFreeSqlObjPartial(pSql); + tscTrace("%p sql result is freed by app", pSql); + } } } } void taos_free_result(TAOS_RES *res) { taos_free_result_imp(res, 0); } +// todo should not be used in async query int taos_errno(TAOS *taos) { STscObj *pObj = (STscObj *)taos; - int code; - if (pObj == NULL || pObj->signature != pObj) return globalCode; + if (pObj == NULL || pObj->signature != pObj) { + return terrno; + } - if ((int8_t)(pObj->pSql->res.code) == -1) - code = TSDB_CODE_OTHERS; - else - code = pObj->pSql->res.code; - - return code; + return pObj->pSql->res.code; } -//static bool validErrorCode(int32_t code) { return code >= TSDB_CODE_SUCCESS && code < TSDB_CODE_MAX_ERROR_CODE; } - /* * In case of invalid sql error, additional information is attached to explain * why the sql is invalid @@ -888,13 +886,15 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) { return z != NULL; } +// todo should not be used in async model char *taos_errstr(TAOS *taos) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) - return (char*)tstrerror(globalCode); + return (char*)tstrerror(terrno); - SSqlObj *pSql = pObj->pSql; + SSqlObj* pSql = pObj->pSql; + if (hasAdditionalErrorInfo(pSql->res.code, &pSql->cmd)) { return pSql->cmd.payload; } else { diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 616bcbba50..9d172d4ea5 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -124,7 +124,6 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* pSql->sqlstr = sqlstr; tsem_init(&pSql->rspSem, 0, 0); - tsem_init(&pSql->emptyRspSem, 0, 1); SSqlRes *pRes = &pSql->res; pRes->numOfRows = 1; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index b0c7b68ab4..ca6ca369e4 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -392,10 +392,10 @@ void freeSubqueryObj(SSqlObj* pSql) { static void doQuitSubquery(SSqlObj* pParentSql) { freeSubqueryObj(pParentSql); - tsem_wait(&pParentSql->emptyRspSem); - tsem_wait(&pParentSql->emptyRspSem); +// tsem_wait(&pParentSql->emptyRspSem); +// tsem_wait(&pParentSql->emptyRspSem); - tsem_post(&pParentSql->rspSem); +// tsem_post(&pParentSql->rspSem); } static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) { @@ -567,7 +567,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { freeSubqueryObj(pParentSql); } - tsem_post(&pParentSql->rspSem); +// tsem_post(&pParentSql->rspSem); } else { tscTrace("%p sub:%p completed, completed:%d, total:%d", pParentSql, tres, finished, numOfTotal); } @@ -662,7 +662,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { } // wait for all subquery completed - tsem_wait(&pSql->rspSem); +// tsem_wait(&pSql->rspSem); // update the records for each subquery for(int32_t i = 0; i < pSql->numOfSubs; ++i) { @@ -797,10 +797,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { tscProcessSql(pSql); } else { // first retrieve from vnode during the secondary stage sub-query if (pParentSql->fp == NULL) { - tsem_wait(&pParentSql->emptyRspSem); - tsem_wait(&pParentSql->emptyRspSem); - - tsem_post(&pParentSql->rspSem); +// tsem_post(&pParentSql->rspSem); } else { // set the command flag must be after the semaphore been correctly set. // pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; @@ -954,10 +951,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) { } } - tsem_post(&pSql->emptyRspSem); - tsem_wait(&pSql->rspSem); - - tsem_post(&pSql->emptyRspSem); +// tsem_wait(&pSql->rspSem); if (pSql->numOfSubs <= 0) { pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 7a585bfa68..ead8c09dec 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -40,15 +40,10 @@ void * tscQhandle; void * tscCheckDiskUsageTmr; int tsInsertHeadSize; -extern int tscEmbedded; -int tscNumOfThreads; -static pthread_once_t tscinit = PTHREAD_ONCE_INIT; -static pthread_mutex_t tscMutex; +int tscNumOfThreads; -extern int tsTscEnableRecordSql; -extern int tsNumOfLogLines; +static pthread_once_t tscinit = PTHREAD_ONCE_INIT; void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); -void deltaToUtcInitOnce(); void tscCheckDiskUsage(void *para, void *unused) { taosGetDisk(); @@ -60,7 +55,6 @@ int32_t tscInitRpc(const char *user, const char *secret) { char secretEncrypt[32] = {0}; taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt); - pthread_mutex_lock(&tscMutex); if (pVnodeConn == NULL) { memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = tsLocalIp; @@ -78,7 +72,6 @@ int32_t tscInitRpc(const char *user, const char *secret) { pVnodeConn = rpcOpen(&rpcInit); if (pVnodeConn == NULL) { tscError("failed to init connection to vnode"); - pthread_mutex_unlock(&tscMutex); return -1; } } @@ -100,12 +93,10 @@ int32_t tscInitRpc(const char *user, const char *secret) { pTscMgmtConn = rpcOpen(&rpcInit); if (pTscMgmtConn == NULL) { tscError("failed to init connection to mgmt"); - pthread_mutex_unlock(&tscMutex); return -1; } } - pthread_mutex_unlock(&tscMutex); return 0; } @@ -113,7 +104,7 @@ void taos_init_imp() { char temp[128]; struct stat dirstat; - pthread_mutex_init(&tscMutex, NULL); + errno = TSDB_CODE_SUCCESS; srand(taosGetTimestampSec()); deltaToUtcInitOnce(); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index ec6881db3f..5c7d8789ea 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -469,7 +469,7 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) { pSql->freed = 0; tscFreeSqlCmdData(pCmd); - tscTrace("%p free sqlObj partial completed", pSql); + tscTrace("%p partially free sqlObj completed", pSql); } void tscFreeSqlObj(SSqlObj* pSql) { @@ -487,8 +487,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { tfree(pCmd->payload); pCmd->allocSize = 0; - - tsem_destroy(&pSql->rspSem); free(pSql); } @@ -820,7 +818,9 @@ void tscCloseTscObj(STscObj* pObj) { taosTmrStopA(&(pObj->pTimer)); tscFreeSqlObj(pSql); + sem_destroy(&pSql->rspSem); pthread_mutex_destroy(&pObj->mutex); + tscTrace("%p DB connection is closed", pObj); tfree(pObj); } @@ -842,10 +842,9 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) { if (pCmd->payload == NULL) { assert(pCmd->allocSize == 0); - pCmd->payload = (char*)malloc(size); + pCmd->payload = (char*)calloc(1, size); if (pCmd->payload == NULL) return TSDB_CODE_CLI_OUT_OF_MEMORY; pCmd->allocSize = size; - memset(pCmd->payload, 0, pCmd->allocSize); } else { if (pCmd->allocSize < size) { char* b = realloc(pCmd->payload, size); @@ -853,6 +852,8 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) { pCmd->payload = b; pCmd->allocSize = size; } + + memset(pCmd->payload, 0, pCmd->payloadLen); } //memset(pCmd->payload, 0, pCmd->allocSize); @@ -1105,7 +1106,7 @@ void tscClearFieldInfo(SFieldInfo* pFieldInfo) { for(int32_t i = 0; i < pFieldInfo->numOfOutputCols; ++i) { if (pFieldInfo->pExpr[i] != NULL) { - tSQLBinaryExprDestroy(&pFieldInfo->pExpr[i]->binExprInfo.pBinExpr, NULL); + tExprTreeDestroy(&pFieldInfo->pExpr[i]->binExprInfo.pBinExpr, NULL); tfree(pFieldInfo->pExpr[i]->binExprInfo.pReqColumns); tfree(pFieldInfo->pExpr[i]); } @@ -1742,7 +1743,7 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) { } STscObj* pTscObj = pSql->pTscObj; - if (pSql->pStream != NULL || pTscObj->pHb == pSql) { + if (pSql->pStream != NULL || pTscObj->pHb == pSql || pTscObj->pSql == pSql) { return false; } @@ -1929,7 +1930,6 @@ STableMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST } pTableMetaInfo->pTableMeta = pTableMeta; -// pTableMetaInfo->pMetricMeta = pMetricMeta; pTableMetaInfo->numOfTags = numOfTags; if (tags != NULL) { @@ -1963,7 +1963,7 @@ void doRemoveMeterMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro } void tscRemoveAllMeterMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) { - tscTrace("%p deref the metric/meter meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables); + tscTrace("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables); int32_t index = pQueryInfo->numOfTables; while (index >= 0) { diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 1c3e3a8638..0a1f015912 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -286,6 +286,7 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg); } else { // no further execution invoked, release the ref to vnode dnodeProcessReadResult(pVnode, pMsg); + dnodeReleaseVnode(pVnode); } } diff --git a/src/os/linux/src/tlinux.c b/src/os/linux/src/tlinux.c index bce4a8f13d..780e2903a0 100644 --- a/src/os/linux/src/tlinux.c +++ b/src/os/linux/src/tlinux.c @@ -236,7 +236,7 @@ void *taosProcessAlarmSignal(void *tharg) { void (*callback)(int) = tharg; timer_t timerId; - struct sigevent sevent; + struct sigevent sevent = {0}; #ifdef _ALPINE sevent.sigev_notify = SIGEV_THREAD; diff --git a/src/query/inc/qast.h b/src/query/inc/qast.h index 51580445d5..616a2a46af 100644 --- a/src/query/inc/qast.h +++ b/src/query/inc/qast.h @@ -16,6 +16,7 @@ #ifndef TDENGINE_TAST_H #define TDENGINE_TAST_H +#include #ifdef __cplusplus extern "C" { #endif @@ -27,14 +28,14 @@ extern "C" { #include "taosdef.h" #include "tvariant.h" -struct tSQLSyntaxNode; +struct tExprNode; struct SSchema; struct tSkipList; struct tSkipListNode; enum { - TSQL_NODE_EXPR = 0x1, - TSQL_NODE_COL = 0x2, + TSQL_NODE_EXPR = 0x1, + TSQL_NODE_COL = 0x2, TSQL_NODE_VALUE = 0x4, }; @@ -60,44 +61,41 @@ typedef struct SBinaryFilterSupp { void * pExtInfo; } SBinaryFilterSupp; -typedef struct tSQLSyntaxNode { +typedef struct tExprNode { uint8_t nodeType; union { struct { - uint8_t optr; // filter operator - uint8_t hasPK; // 0: do not contain primary filter, 1: contain - void * info; // support filter operation on this expression only available for leaf node + uint8_t optr; // filter operator + uint8_t hasPK; // 0: do not contain primary filter, 1: contain + void * info; // support filter operation on this expression only available for leaf node - struct tSQLSyntaxNode *pLeft; // left child pointer - struct tSQLSyntaxNode *pRight; // right child pointer + struct tExprNode *pLeft; // left child pointer + struct tExprNode *pRight; // right child pointer } _node; struct SSchema *pSchema; tVariant * pVal; }; -} tSQLSyntaxNode; +} tExprNode; +void tSQLBinaryExprFromString(tExprNode **pExpr, SSchema *pSchema, int32_t numOfCols, char *src, int32_t len); -typedef struct tQueryResultset { - void ** pRes; - int64_t num; -} tQueryResultset; +void tSQLBinaryExprToString(tExprNode *pExpr, char *dst, int32_t *len); -void tSQLBinaryExprFromString(tSQLSyntaxNode **pExpr, SSchema *pSchema, int32_t numOfCols, char *src, int32_t len); +void tExprTreeDestroy(tExprNode **pExprs, void (*fp)(void*)); -void tSQLBinaryExprToString(tSQLSyntaxNode *pExpr, char *dst, int32_t *len); +void tSQLBinaryExprTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SBinaryFilterSupp *param); -void tSQLBinaryExprDestroy(tSQLSyntaxNode **pExprs, void (*fp)(void*)); - -void tSQLBinaryExprTraverse(tSQLSyntaxNode *pExpr, SSkipList *pSkipList, SArray *result, SBinaryFilterSupp *param); - -void tSQLBinaryExprCalcTraverse(tSQLSyntaxNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order, +void tSQLBinaryExprCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order, char *(*cb)(void *, char *, int32_t)); -void tSQLBinaryExprTrv(tSQLSyntaxNode *pExprs, int32_t *val, int16_t *ids); -void tQueryResultClean(tQueryResultset *pRes); +void tSQLBinaryExprTrv(tExprNode *pExprs, int32_t *val, int16_t *ids); uint8_t getBinaryExprOptr(SSQLToken *pToken); +SBuffer exprTreeToBinary(tExprNode* pExprTree); + +tExprNode* exprTreeFromBinary(const void* pBuf, size_t size); + #ifdef __cplusplus } #endif diff --git a/src/query/inc/qsqlparser.h b/src/query/inc/qsqlparser.h index 951caa8073..b1799b6902 100644 --- a/src/query/inc/qsqlparser.h +++ b/src/query/inc/qsqlparser.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_QASTDEF_H -#define TDENGINE_QASTDEF_H +#ifndef TDENGINE_QSQLPARSER_H +#define TDENGINE_QSQLPARSER_H #ifdef __cplusplus extern "C" { @@ -329,4 +329,4 @@ int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pSql); } #endif -#endif // TDENGINE_QASTDEF_H +#endif // TDENGINE_QSQLPARSER_H diff --git a/src/query/src/qast.c b/src/query/src/qast.c index 192a7f39c0..4524c5b249 100644 --- a/src/query/src/qast.c +++ b/src/query/src/qast.c @@ -13,11 +13,11 @@ * along with this program. If not, see . */ -#include "qast.h" -#include -#include -#include "../../client/inc/tschemautil.h" #include "os.h" + +#include "tutil.h" +#include "tbuffer.h" +#include "qast.h" #include "qsqlparser.h" #include "qsyntaxtreefunction.h" #include "taosdef.h" @@ -26,7 +26,10 @@ #include "tsqlfunction.h" #include "tstoken.h" #include "ttokendef.h" -#include "tutil.h" + +#include "../../client/inc/tschemautil.h" +#include "tarray.h" +#include "tskiplist.h" /* * @@ -39,22 +42,20 @@ * ver 0.3, pipeline filter in the form of: (a+2)/9 > 14 * */ +static tExprNode *tExprNodeCreate(SSchema *pSchema, int32_t numOfCols, SSQLToken *pToken); +static void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)); -static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols, SSQLToken *pToken); -static void tSQLSyntaxNodeDestroy(tSQLSyntaxNode *pNode, void (*fp)(void *)); +static tExprNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, char *str, int32_t *i); +static void destroySyntaxTree(tExprNode *); -static tSQLSyntaxNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, char *str, int32_t *i); -static void destroySyntaxTree(tSQLSyntaxNode *); - -static uint8_t isQueryOnPrimaryKey(const char *primaryColumnName, const tSQLSyntaxNode *pLeft, - const tSQLSyntaxNode *pRight); +static uint8_t isQueryOnPrimaryKey(const char *primaryColumnName, const tExprNode *pLeft, const tExprNode *pRight); /* * Check the filter value type on the right hand side based on the column id on the left hand side, * the filter value type must be identical to field type for relational operation * As for binary arithmetic operation, it is not necessary to do so. */ -static void reviseBinaryExprIfNecessary(tSQLSyntaxNode **pLeft, tSQLSyntaxNode **pRight, uint8_t *optr) { +static void reviseBinaryExprIfNecessary(tExprNode **pLeft, tExprNode **pRight, uint8_t *optr) { if (*optr >= TSDB_RELATION_LESS && *optr <= TSDB_RELATION_LIKE) { // make sure that the type of data on both sides of relational comparision are identical if ((*pLeft)->nodeType == TSQL_NODE_VALUE) { @@ -79,7 +80,7 @@ static void reviseBinaryExprIfNecessary(tSQLSyntaxNode **pLeft, tSQLSyntaxNode * */ if ((*pLeft)->nodeType == TSQL_NODE_VALUE && (*pRight)->nodeType == TSQL_NODE_COL) { if (*optr >= TSDB_RELATION_LARGE && *optr <= TSDB_RELATION_LARGE_EQUAL && *optr != TSDB_RELATION_EQUAL) { - SWAP(*pLeft, *pRight, tSQLSyntaxNode *); + SWAP(*pLeft, *pRight, tExprNode *); } switch (*optr) { @@ -101,15 +102,15 @@ static void reviseBinaryExprIfNecessary(tSQLSyntaxNode **pLeft, tSQLSyntaxNode * } } -static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols, SSQLToken *pToken) { +static tExprNode *tExprNodeCreate(SSchema *pSchema, int32_t numOfCols, SSQLToken *pToken) { /* if the token is not a value, return false */ if (pToken->type == TK_RP || (pToken->type != TK_INTEGER && pToken->type != TK_FLOAT && pToken->type != TK_ID && pToken->type != TK_TBNAME && pToken->type != TK_STRING && pToken->type != TK_BOOL)) { return NULL; } - size_t nodeSize = sizeof(tSQLSyntaxNode); - tSQLSyntaxNode *pNode = NULL; + size_t nodeSize = sizeof(tExprNode); + tExprNode *pNode = NULL; if (pToken->type == TK_ID || pToken->type == TK_TBNAME) { int32_t i = 0; @@ -130,7 +131,7 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols, nodeSize += sizeof(SSchema); pNode = calloc(1, nodeSize); - pNode->pSchema = (struct SSchema *)((char *)pNode + sizeof(tSQLSyntaxNode)); + pNode->pSchema = (struct SSchema *)((char *)pNode + sizeof(tExprNode)); pNode->nodeType = TSQL_NODE_COL; if (pToken->type == TK_ID) { @@ -145,7 +146,7 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols, } else { nodeSize += sizeof(tVariant); pNode = calloc(1, nodeSize); - pNode->pVal = (tVariant *)((char *)pNode + sizeof(tSQLSyntaxNode)); + pNode->pVal = (tVariant *)((char *)pNode + sizeof(tExprNode)); toTSDBType(pToken->type); tVariantCreate(pNode->pVal, pToken); @@ -191,21 +192,21 @@ uint8_t getBinaryExprOptr(SSQLToken *pToken) { } // previous generated expr is reduced as the left child -static tSQLSyntaxNode *parseRemainStr(char *pstr, tSQLSyntaxNode *pExpr, SSchema *pSchema, int32_t optr, +static tExprNode *parseRemainStr(char *pstr, tExprNode *pExpr, SSchema *pSchema, int32_t optr, int32_t numOfCols, int32_t *i) { // set the previous generated node as the left child of new root pExpr->nodeType = TSQL_NODE_EXPR; // remain is the right child - tSQLSyntaxNode *pRight = createSyntaxTree(pSchema, numOfCols, pstr, i); + tExprNode *pRight = createSyntaxTree(pSchema, numOfCols, pstr, i); if (pRight == NULL || (pRight->nodeType == TSQL_NODE_COL && pExpr->nodeType != TSQL_NODE_VALUE) || (pExpr->nodeType == TSQL_NODE_VALUE && pRight->nodeType != TSQL_NODE_COL)) { - tSQLSyntaxNodeDestroy(pExpr, NULL); - tSQLSyntaxNodeDestroy(pRight, NULL); + tExprNodeDestroy(pExpr, NULL); + tExprNodeDestroy(pRight, NULL); return NULL; } - tSQLSyntaxNode *pNewExpr = (tSQLSyntaxNode *)calloc(1, sizeof(tSQLSyntaxNode)); + tExprNode *pNewExpr = (tExprNode *)calloc(1, sizeof(tExprNode)); uint8_t k = optr; reviseBinaryExprIfNecessary(&pExpr, &pRight, &k); pNewExpr->_node.pLeft = pExpr; @@ -218,7 +219,7 @@ static tSQLSyntaxNode *parseRemainStr(char *pstr, tSQLSyntaxNode *pExpr, SSchema return pNewExpr; } -uint8_t isQueryOnPrimaryKey(const char *primaryColumnName, const tSQLSyntaxNode *pLeft, const tSQLSyntaxNode *pRight) { +uint8_t isQueryOnPrimaryKey(const char *primaryColumnName, const tExprNode *pLeft, const tExprNode *pRight) { if (pLeft->nodeType == TSQL_NODE_COL) { // if left node is the primary column,return true return (strcmp(primaryColumnName, pLeft->pSchema->name) == 0) ? 1 : 0; @@ -231,20 +232,21 @@ uint8_t isQueryOnPrimaryKey(const char *primaryColumnName, const tSQLSyntaxNode } } -static tSQLSyntaxNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, char *str, int32_t *i) { +static tExprNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, char *str, int32_t *i) { SSQLToken t0 = tStrGetToken(str, i, false, 0, NULL); if (t0.n == 0) { return NULL; } - tSQLSyntaxNode *pLeft = NULL; + tExprNode *pLeft = NULL; if (t0.type == TK_LP) { // start new left child branch pLeft = createSyntaxTree(pSchema, numOfCols, str, i); } else { if (t0.type == TK_RP) { return NULL; } - pLeft = tSQLSyntaxNodeCreate(pSchema, numOfCols, &t0); + + pLeft = tExprNodeCreate(pSchema, numOfCols, &t0); } if (pLeft == NULL) { @@ -254,7 +256,7 @@ static tSQLSyntaxNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, cha t0 = tStrGetToken(str, i, false, 0, NULL); if (t0.n == 0 || t0.type == TK_RP) { if (pLeft->nodeType != TSQL_NODE_EXPR) { // if left is not the expr, it is not a legal expr - tSQLSyntaxNodeDestroy(pLeft, NULL); + tExprNodeDestroy(pLeft, NULL); return NULL; } @@ -265,12 +267,12 @@ static tSQLSyntaxNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, cha uint8_t optr = getBinaryExprOptr(&t0); if (optr == 0) { pError("not support binary operator:%d", t0.type); - tSQLSyntaxNodeDestroy(pLeft, NULL); + tExprNodeDestroy(pLeft, NULL); return NULL; } assert(pLeft != NULL); - tSQLSyntaxNode *pRight = NULL; + tExprNode *pRight = NULL; if (t0.type == TK_AND || t0.type == TK_OR || t0.type == TK_LP) { pRight = createSyntaxTree(pSchema, numOfCols, str, i); @@ -283,51 +285,51 @@ static tSQLSyntaxNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, cha */ t0 = tStrGetToken(str, i, true, 0, NULL); if (t0.n == 0) { - tSQLSyntaxNodeDestroy(pLeft, NULL); // illegal expression + tExprNodeDestroy(pLeft, NULL); // illegal expression return NULL; } if (t0.type == TK_LP) { pRight = createSyntaxTree(pSchema, numOfCols, str, i); } else { - pRight = tSQLSyntaxNodeCreate(pSchema, numOfCols, &t0); + pRight = tExprNodeCreate(pSchema, numOfCols, &t0); } } if (pRight == NULL) { - tSQLSyntaxNodeDestroy(pLeft, NULL); + tExprNodeDestroy(pLeft, NULL); return NULL; } /* create binary expr as the child of new parent node */ - tSQLSyntaxNode *pBinExpr = (tSQLSyntaxNode *)calloc(1, sizeof(tSQLSyntaxNode)); + tExprNode *pExpr = (tExprNode *)calloc(1, sizeof(tExprNode)); reviseBinaryExprIfNecessary(&pLeft, &pRight, &optr); - pBinExpr->_node.hasPK = isQueryOnPrimaryKey(pSchema[0].name, pLeft, pRight); - pBinExpr->_node.pLeft = pLeft; - pBinExpr->_node.pRight = pRight; - pBinExpr->_node.optr = optr; + pExpr->_node.hasPK = isQueryOnPrimaryKey(pSchema[0].name, pLeft, pRight); + pExpr->_node.pLeft = pLeft; + pExpr->_node.pRight = pRight; + pExpr->_node.optr = optr; t0 = tStrGetToken(str, i, true, 0, NULL); if (t0.n == 0 || t0.type == TK_RP) { - tSQLSyntaxNode *pn = malloc(sizeof(tSQLSyntaxNode)); - pBinExpr->nodeType = TSQL_NODE_EXPR; - return pBinExpr; + pExpr->nodeType = TSQL_NODE_EXPR; + return pExpr; } else { uint8_t localOptr = getBinaryExprOptr(&t0); if (localOptr == 0) { pError("not support binary operator:%d", t0.type); - free(pBinExpr); + free(pExpr); return NULL; } - return parseRemainStr(str, pBinExpr, pSchema, localOptr, numOfCols, i); + return parseRemainStr(str, pExpr, pSchema, localOptr, numOfCols, i); } } -void tSQLBinaryExprFromString(tSQLSyntaxNode **pExpr, SSchema *pSchema, int32_t numOfCols, char *src, int32_t len) { +void tSQLBinaryExprFromString(tExprNode **pExpr, SSchema *pSchema, int32_t numOfCols, char *src, int32_t len) { *pExpr = NULL; + if (len <= 0 || src == NULL || pSchema == NULL || numOfCols <= 0) { return; } @@ -340,7 +342,7 @@ void tSQLBinaryExprFromString(tSQLSyntaxNode **pExpr, SSchema *pSchema, int32_t } } -int32_t tSQLBinaryExprToStringImpl(tSQLSyntaxNode *pNode, char *dst, uint8_t type) { +int32_t tSQLBinaryExprToStringImpl(tExprNode *pNode, char *dst, uint8_t type) { int32_t len = 0; if (type == TSQL_NODE_EXPR) { *dst = '('; @@ -406,7 +408,7 @@ static char *tSQLOptrToString(uint8_t optr, char *dst) { return dst; } -void tSQLBinaryExprToString(tSQLSyntaxNode *pExpr, char *dst, int32_t *len) { +void tSQLBinaryExprToString(tExprNode *pExpr, char *dst, int32_t *len) { if (pExpr == NULL) { *dst = 0; *len = 0; @@ -423,32 +425,41 @@ void tSQLBinaryExprToString(tSQLSyntaxNode *pExpr, char *dst, int32_t *len) { *len += tSQLBinaryExprToStringImpl(pExpr->_node.pRight, start, pExpr->_node.pRight->nodeType); } -static void UNUSED_FUNC destroySyntaxTree(tSQLSyntaxNode *pNode) { tSQLSyntaxNodeDestroy(pNode, NULL); } +static void UNUSED_FUNC destroySyntaxTree(tExprNode *pNode) { tExprNodeDestroy(pNode, NULL); } -static void tSQLSyntaxNodeDestroy(tSQLSyntaxNode *pNode, void (*fp)(void *)) { +static void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)) { if (pNode == NULL) { return; } if (pNode->nodeType == TSQL_NODE_EXPR) { - tSQLBinaryExprDestroy(&pNode, fp); + tExprTreeDestroy(&pNode, fp); } else if (pNode->nodeType == TSQL_NODE_VALUE) { tVariantDestroy(pNode->pVal); + } else if (pNode->nodeType == TSQL_NODE_COL) { + free(pNode->pSchema); } free(pNode); } -void tSQLBinaryExprDestroy(tSQLSyntaxNode **pExpr, void (*fp)(void *)) { +void tExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) { if (*pExpr == NULL) { return; } - - tSQLSyntaxNodeDestroy((*pExpr)->_node.pLeft, fp); - tSQLSyntaxNodeDestroy((*pExpr)->_node.pRight, fp); - - if (fp != NULL) { - fp((*pExpr)->_node.info); + + if ((*pExpr)->nodeType == TSQL_NODE_EXPR) { + tExprTreeDestroy(&(*pExpr)->_node.pLeft, fp); + tExprTreeDestroy(&(*pExpr)->_node.pRight, fp); + + if (fp != NULL) { + fp((*pExpr)->_node.info); + } + } else if ((*pExpr)->nodeType == TSQL_NODE_VALUE) { + tVariantDestroy((*pExpr)->pVal); + free((*pExpr)->pVal); + } else if ((*pExpr)->nodeType == TSQL_NODE_COL) { + free((*pExpr)->pSchema); } free(*pExpr); @@ -559,7 +570,7 @@ void tSQLBinaryExprDestroy(tSQLSyntaxNode **pExpr, void (*fp)(void *)) { // DEFAULT_COMP(p1, p2); //} -int32_t merge(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResultset *pFinalRes) { +int32_t merge(SArray *pLeft, SArray *pRight, SArray *pFinalRes) { // assert(pFinalRes->pRes == 0); // // pFinalRes->pRes = calloc((size_t)(pLeft->num + pRight->num), POINTER_BYTES); @@ -600,7 +611,7 @@ int32_t merge(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResultset * return 0; } -int32_t intersect(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResultset *pFinalRes) { +int32_t intersect(SArray *pLeft, SArray *pRight, SArray *pFinalRes) { // int64_t num = MIN(pLeft->num, pRight->num); // // assert(pFinalRes->pRes == 0); @@ -638,25 +649,26 @@ int32_t intersect(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResults /* * traverse the result and apply the function to each item to check if the item is qualified or not */ -static UNUSED_FUNC void tSQLListTraverseOnResult(struct tSQLSyntaxNode *pExpr, __result_filter_fn_t fp, tQueryResultset *pResult) { - assert(pExpr->_node.pLeft->nodeType == TSQL_NODE_COL && pExpr->_node.pRight->nodeType == TSQL_NODE_VALUE); - - // brutal force scan the result list and check for each item in the list - int64_t num = pResult->num; - for (int32_t i = 0, j = 0; i < pResult->num; ++i) { - if (fp == NULL || (fp(pResult->pRes[i], pExpr->_node.info) == true)) { - pResult->pRes[j++] = pResult->pRes[i]; - } else { - num--; - } - } - - pResult->num = num; +static UNUSED_FUNC void tSQLListTraverseOnResult(struct tExprNode *pExpr, __result_filter_fn_t fp, SArray *pResult) { +// assert(pExpr->_node.pLeft->nodeType == TSQL_NODE_COL && pExpr->_node.pRight->nodeType == TSQL_NODE_VALUE); +// +// // brutal force scan the result list and check for each item in the list +// int64_t num = pResult->num; +// for (int32_t i = 0, j = 0; i < pResult->num; ++i) { +// if (fp == NULL || (fp(pResult->pRes[i], pExpr->_node.info) == true)) { +// pResult->pRes[j++] = pResult->pRes[i]; +// } else { +// num--; +// } +// } +// +// pResult->num = num; + assert(0); } -static bool filterItem(tSQLSyntaxNode *pExpr, const void *pItem, SBinaryFilterSupp *param) { - tSQLSyntaxNode *pLeft = pExpr->_node.pLeft; - tSQLSyntaxNode *pRight = pExpr->_node.pRight; +static bool filterItem(tExprNode *pExpr, const void *pItem, SBinaryFilterSupp *param) { + tExprNode *pLeft = pExpr->_node.pLeft; + tExprNode *pRight = pExpr->_node.pRight; /* * non-leaf nodes, recursively traverse the syntax tree in the post-root order @@ -695,7 +707,7 @@ static bool filterItem(tSQLSyntaxNode *pExpr, const void *pItem, SBinaryFilterSu * @param pSchema tag schemas * @param fp filter callback function */ -static void tSQLBinaryTraverseOnResult(tSQLSyntaxNode *pExpr, SArray *pResult, SBinaryFilterSupp *param) { +static void tSQLBinaryTraverseOnResult(tExprNode *pExpr, SArray *pResult, SBinaryFilterSupp *param) { size_t size = taosArrayGetSize(pResult); SArray* array = taosArrayInit(size, POINTER_BYTES); @@ -710,7 +722,7 @@ static void tSQLBinaryTraverseOnResult(tSQLSyntaxNode *pExpr, SArray *pResult, S taosArrayCopy(pResult, array); } -static void tSQLBinaryTraverseOnSkipList(tSQLSyntaxNode *pExpr, SArray *pResult, SSkipList *pSkipList, +static void tSQLBinaryTraverseOnSkipList(tExprNode *pExpr, SArray *pResult, SSkipList *pSkipList, SBinaryFilterSupp *param) { SSkipListIterator* iter = tSkipListCreateIter(pSkipList); @@ -724,13 +736,13 @@ static void tSQLBinaryTraverseOnSkipList(tSQLSyntaxNode *pExpr, SArray *pResult, } // post-root order traverse syntax tree -void tSQLBinaryExprTraverse(tSQLSyntaxNode *pExpr, SSkipList *pSkipList, SArray *result, SBinaryFilterSupp *param) { +void tSQLBinaryExprTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SBinaryFilterSupp *param) { if (pExpr == NULL) { return; } - tSQLSyntaxNode *pLeft = pExpr->_node.pLeft; - tSQLSyntaxNode *pRight = pExpr->_node.pRight; + tExprNode *pLeft = pExpr->_node.pLeft; + tExprNode *pRight = pExpr->_node.pRight; // recursive traverse left child branch if (pLeft->nodeType == TSQL_NODE_EXPR || pRight->nodeType == TSQL_NODE_EXPR) { @@ -751,22 +763,22 @@ void tSQLBinaryExprTraverse(tSQLSyntaxNode *pExpr, SSkipList *pSkipList, SArray assert(taosArrayGetSize(result) == 0); tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param); } else if (weight == 2 || (weight == 1 && pExpr->_node.optr == TSDB_RELATION_OR)) { - tQueryResultset rLeft = {0}; - tQueryResultset rRight = {0}; + SArray* rLeft = taosArrayInit(10, POINTER_BYTES); + SArray* rRight = taosArrayInit(10, POINTER_BYTES); - tSQLBinaryExprTraverse(pLeft, pSkipList, &rLeft, param); - tSQLBinaryExprTraverse(pRight, pSkipList, &rRight, param); + tSQLBinaryExprTraverse(pLeft, pSkipList, rLeft, param); + tSQLBinaryExprTraverse(pRight, pSkipList, rRight, param); if (pExpr->_node.optr == TSDB_RELATION_AND) { // CROSS - intersect(&rLeft, &rRight, result); + intersect(rLeft, rRight, result); } else if (pExpr->_node.optr == TSDB_RELATION_OR) { // or - merge(&rLeft, &rRight, result); + merge(rLeft, rRight, result); } else { assert(false); } - free(rLeft.pRes); - free(rRight.pRes); + taosArrayDestroy(rLeft); + taosArrayDestroy(rRight); } else { /* * (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) is handled here @@ -776,8 +788,8 @@ void tSQLBinaryExprTraverse(tSQLSyntaxNode *pExpr, SSkipList *pSkipList, SArray */ assert(pExpr->_node.optr == TSDB_RELATION_AND); - tSQLSyntaxNode *pFirst = NULL; - tSQLSyntaxNode *pSecond = NULL; + tExprNode *pFirst = NULL; + tExprNode *pSecond = NULL; if (pLeft->_node.hasPK == 1) { pFirst = pLeft; pSecond = pRight; @@ -810,14 +822,14 @@ void tSQLBinaryExprTraverse(tSQLSyntaxNode *pExpr, SSkipList *pSkipList, SArray } } -void tSQLBinaryExprCalcTraverse(tSQLSyntaxNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order, +void tSQLBinaryExprCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order, char *(*getSourceDataBlock)(void *, char *, int32_t)) { if (pExprs == NULL) { return; } - tSQLSyntaxNode *pLeft = pExprs->_node.pLeft; - tSQLSyntaxNode *pRight = pExprs->_node.pRight; + tExprNode *pLeft = pExprs->_node.pLeft; + tExprNode *pRight = pExprs->_node.pRight; /* the left output has result from the left child syntax tree */ char *pLeftOutput = (char*)malloc(sizeof(int64_t) * numOfRows); @@ -890,13 +902,13 @@ void tSQLBinaryExprCalcTraverse(tSQLSyntaxNode *pExprs, int32_t numOfRows, char free(pRightOutput); } -void tSQLBinaryExprTrv(tSQLSyntaxNode *pExprs, int32_t *val, int16_t *ids) { +void tSQLBinaryExprTrv(tExprNode *pExprs, int32_t *val, int16_t *ids) { if (pExprs == NULL) { return; } - tSQLSyntaxNode *pLeft = pExprs->_node.pLeft; - tSQLSyntaxNode *pRight = pExprs->_node.pRight; + tExprNode *pLeft = pExprs->_node.pLeft; + tExprNode *pRight = pExprs->_node.pRight; // recursive traverse left child branch if (pLeft->nodeType == TSQL_NODE_EXPR) { @@ -914,11 +926,97 @@ void tSQLBinaryExprTrv(tSQLSyntaxNode *pExprs, int32_t *val, int16_t *ids) { } } -void tQueryResultClean(tQueryResultset *pRes) { - if (pRes == NULL) { - return; +static int32_t exprTreeToBinaryImpl(tExprNode* pExprTree, SBuffer* pBuf) { + tbufWrite(pBuf, &pExprTree->nodeType, sizeof(pExprTree->nodeType)); + + if (pExprTree->nodeType == TSQL_NODE_VALUE) { + tVariant* pVal = pExprTree->pVal; + + tbufWrite(pBuf, &pVal->nType, sizeof(pVal->nType)); + if (pVal->nType == TSDB_DATA_TYPE_BINARY) { + tbufWrite(pBuf, &pVal->nLen, sizeof(pVal->nLen)); + tbufWrite(pBuf, pVal->pz, pVal->nLen); + } else { + tbufWrite(pBuf, &pVal->pz, sizeof(pVal->i64Key)); + } + + } else if (pExprTree->nodeType == TSQL_NODE_COL) { + SSchema* pSchema = pExprTree->pSchema; + tbufWrite(pBuf, &pSchema->colId, sizeof(pSchema->colId)); + tbufWrite(pBuf, &pSchema->bytes, sizeof(pSchema->bytes)); + tbufWrite(pBuf, &pSchema->type, sizeof(pSchema->type)); + + int32_t len = strlen(pSchema->name); + tbufWriteStringLen(pBuf, pSchema->name, len); + + } else if (pExprTree->nodeType == TSQL_NODE_EXPR) { + tbufWrite(pBuf, &pExprTree->_node.optr, sizeof(pExprTree->_node.optr)); + tbufWrite(pBuf, &pExprTree->_node.hasPK, sizeof(pExprTree->_node.hasPK)); + + exprTreeToBinaryImpl(pExprTree->_node.pLeft, pBuf); + exprTreeToBinaryImpl(pExprTree->_node.pRight, pBuf); } - - tfree(pRes->pRes); - pRes->num = 0; } + +SBuffer exprTreeToBinary(tExprNode* pExprTree) { + SBuffer buf = {0}; + if (pExprTree == NULL) { + return buf; + } + + int32_t code = tbufBeginWrite(&buf); + if (code != 0) { + return buf; + } + + exprTreeToBinaryImpl(pExprTree, &buf); + return buf; +} + +static tExprNode* exprTreeFromBinaryImpl(tExprNode** pExprTree, SBuffer* pBuf) { + tExprNode* pExpr = calloc(1, sizeof(tExprNode)); + tbufReadToBuffer(pBuf, &pExpr->nodeType, sizeof(pExpr->nodeType)); + + if (pExpr->nodeType == TSQL_NODE_VALUE) { + tVariant* pVal = calloc(1, sizeof(tVariant)); + + tbufReadToBuffer(pBuf, &pVal->nType, sizeof(pVal->nType)); + if (pVal->nType == TSDB_DATA_TYPE_BINARY) { + tbufReadToBuffer(pBuf, &pVal->nLen, sizeof(pVal->nLen)); + pVal->pz = calloc(1, pVal->nLen + 1); + tbufReadToBuffer(pBuf, pVal->pz, pVal->nLen); + } else { + tbufReadToBuffer(pBuf, &pVal->pz, sizeof(pVal->i64Key)); + } + + pExpr->pVal = pVal; + } else if (pExpr->nodeType == TSQL_NODE_COL) { + SSchema* pSchema = calloc(1, sizeof(SSchema)); + tbufReadToBuffer(pBuf, &pSchema->colId, sizeof(pSchema->colId)); + tbufReadToBuffer(pBuf, &pSchema->bytes, sizeof(pSchema->bytes)); + tbufReadToBuffer(pBuf, &pSchema->type, sizeof(pSchema->type)); + + tbufReadToString(pBuf, pSchema->name, TSDB_COL_NAME_LEN); + + pExpr->pSchema = pSchema; + } else if (pExpr->nodeType == TSQL_NODE_EXPR) { + tbufReadToBuffer(pBuf, &pExpr->_node.optr, sizeof(pExpr->_node.optr)); + tbufReadToBuffer(pBuf, &pExpr->_node.hasPK, sizeof(pExpr->_node.hasPK)); + + exprTreeFromBinaryImpl(&pExpr->_node.pLeft, pBuf); + exprTreeFromBinaryImpl(&pExpr->_node.pRight, pBuf); + + assert(pExpr->_node.pLeft != NULL && pExpr->_node.pRight != NULL); + } + + *pExprTree = pExpr; +} + +tExprNode* exprTreeFromBinary(const void* pBuf, size_t size) { + SBuffer rbuf = {0}; + int32_t code = tbufBeginRead(&rbuf, pBuf, size); + + tExprNode* pExprNode = NULL; + exprTreeFromBinaryImpl(&pExprNode, &rbuf); + return pExprNode; +} \ No newline at end of file diff --git a/src/query/src/qtokenizer.c b/src/query/src/qtokenizer.c index 61d2e59c87..51b196a9da 100644 --- a/src/query/src/qtokenizer.c +++ b/src/query/src/qtokenizer.c @@ -13,10 +13,10 @@ * along with this program. If not, see . */ +#include "os.h" + #include "hash.h" #include "hashfunc.h" -#include "os.h" -#include "shash.h" #include "taosdef.h" #include "tstoken.h" #include "ttokendef.h" diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index a32c8638ce..932c043377 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -2609,7 +2609,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order); tsdb_query_handle_t pQueryHandle = pRuntimeEnv->pQueryHandle; - while (tsdbNextDataBlock(pQueryHandle)) { // check if query is killed or not set the status of query to pass the status check if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { @@ -5149,7 +5148,7 @@ static void singleTableQueryImpl(SQInfo* pQInfo) { int64_t st = taosGetTimestampUs(); // group by normal column, sliding window query, interval query are handled by interval query processor - if (pQuery->intervalTime != 0 || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation) + if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation) tableIntervalProcessor(pQInfo); } else { if (isFixedOutputQuery(pQuery)) { @@ -5461,7 +5460,7 @@ static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMs SSqlBinaryExprInfo *pBinaryExprInfo = &pExpr->binExprInfo; SColumnInfo * pColMsg = pQueryMsg->colList; #if 0 - tSQLSyntaxNode* pBinExpr = NULL; + tExprNode* pBinExpr = NULL; SSchema* pSchema = toSchema(pQueryMsg, pColMsg, pQueryMsg->numOfCols); dTrace("qmsg:%p create binary expr from string:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz); @@ -5962,7 +5961,7 @@ static void freeQInfo(SQInfo *pQInfo) { if (pBinExprInfo->numOfCols > 0) { tfree(pBinExprInfo->pReqColumns); - tSQLBinaryExprDestroy(&pBinExprInfo->pBinExpr, NULL); + tExprTreeDestroy(&pBinExprInfo->pBinExpr, NULL); } } diff --git a/src/query/src/queryUtil.c b/src/query/src/queryUtil.c index d954c10849..17410b2868 100644 --- a/src/query/src/queryUtil.c +++ b/src/query/src/queryUtil.c @@ -24,6 +24,7 @@ #include "ttime.h" #include "queryExecutor.h" +#include "queryUtil.h" int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t size, int32_t threshold, int16_t type) { diff --git a/src/query/tests/astTest.cpp b/src/query/tests/astTest.cpp new file mode 100644 index 0000000000..ac10cd2429 --- /dev/null +++ b/src/query/tests/astTest.cpp @@ -0,0 +1,630 @@ +#include +#include +#include +#include +#include + +#include "qast.h" +#include "taosmsg.h" +#include "tsdb.h" +#include "tskiplist.h" + +typedef struct ResultObj { + int32_t numOfResult; + char * resultName[64]; +} ResultObj; + +static void initSchema(SSchema *pSchema, int32_t numOfCols); + +static void initSchema_binary(SSchema *schema, int32_t numOfCols); + +static tSkipList *createSkipList(SSchema *pSchema, int32_t numOfTags); +static tSkipList *createSkipList_binary(SSchema *pSchema, int32_t numOfTags); + +static void testQueryStr(SSchema *schema, int32_t numOfCols, char *sql, tSkipList *pSkipList, ResultObj *expectedVal); + +static void dropMeter(tSkipList *pSkipList); + +static void Right2LeftTest(SSchema *schema, int32_t numOfCols, tSkipList *pSkipList); + +static void Left2RightTest(SSchema *schema, int32_t numOfCols, tSkipList *pSkipList); + +static void IllegalExprTest(SSchema *schema, int32_t numOfCols, tSkipList *pSkipList); + +static void Left2RightTest_binary(SSchema *schema, int32_t numOfCols, tSkipList *pSkipList); +static void Right2LeftTest_binary(SSchema *schema, int32_t numOfCols, tSkipList *pSkipList); + +void setValue(ResultObj *pResult, int32_t num, char **val) { + pResult->numOfResult = num; + for (int32_t i = 0; i < num; ++i) { + pResult->resultName[i] = val[i]; + } +} + +static void initSchema_binary(SSchema *schema, int32_t numOfCols) { + schema[0].type = TSDB_DATA_TYPE_BINARY; + schema[0].bytes = 8; + strcpy(schema[0].name, "a"); + + schema[1].type = TSDB_DATA_TYPE_DOUBLE; + schema[1].bytes = 8; + strcpy(schema[1].name, "b"); + + schema[2].type = TSDB_DATA_TYPE_INT; + schema[2].bytes = 20; + strcpy(schema[2].name, "c"); + + schema[3].type = TSDB_DATA_TYPE_BIGINT; + schema[3].bytes = 8; + strcpy(schema[3].name, "d"); + + schema[4].type = TSDB_DATA_TYPE_SMALLINT; + schema[4].bytes = 2; + strcpy(schema[4].name, "e"); + + schema[5].type = TSDB_DATA_TYPE_TINYINT; + schema[5].bytes = 1; + strcpy(schema[5].name, "f"); + + schema[6].type = TSDB_DATA_TYPE_FLOAT; + schema[6].bytes = 4; + strcpy(schema[6].name, "g"); + + schema[7].type = TSDB_DATA_TYPE_BOOL; + schema[7].bytes = 1; + strcpy(schema[7].name, "h"); +} + +static void initSchema(SSchema *schema, int32_t numOfCols) { + schema[0].type = TSDB_DATA_TYPE_INT; + schema[0].bytes = 8; + strcpy(schema[0].name, "a"); + + schema[1].type = TSDB_DATA_TYPE_DOUBLE; + schema[1].bytes = 8; + strcpy(schema[1].name, "b"); + + schema[2].type = TSDB_DATA_TYPE_BINARY; + schema[2].bytes = 20; + strcpy(schema[2].name, "c"); + + schema[3].type = TSDB_DATA_TYPE_BIGINT; + schema[3].bytes = 8; + strcpy(schema[3].name, "d"); + + schema[4].type = TSDB_DATA_TYPE_SMALLINT; + schema[4].bytes = 2; + strcpy(schema[4].name, "e"); + + schema[5].type = TSDB_DATA_TYPE_TINYINT; + schema[5].bytes = 1; + strcpy(schema[5].name, "f"); + + schema[6].type = TSDB_DATA_TYPE_FLOAT; + schema[6].bytes = 4; + strcpy(schema[6].name, "g"); + + schema[7].type = TSDB_DATA_TYPE_BOOL; + schema[7].bytes = 1; + strcpy(schema[7].name, "h"); +} + +// static void addOneNode(SSchema *pSchema, int32_t tagsLen, tSkipList *pSkipList, +// char *meterId, int32_t a, double b, char *c, int64_t d, int16_t e, int8_t f, float g, +// bool h, int32_t numOfTags) { +// STabObj *pMeter = calloc(1, sizeof(STabObj)); +// pMeter->numOfTags = numOfTags; +// pMeter->pTagData = calloc(1, tagsLen + TSDB_METER_ID_LEN); +// strcpy(pMeter->meterId, meterId); +// +// char *tags = pMeter->pTagData + TSDB_METER_ID_LEN; +// int32_t offset = 0; +// +// *(int32_t *) tags = a; +// +// offset += pSchema[0].bytes; +// *(double *) (tags + offset) = b; +// +// offset += pSchema[1].bytes; +// memcpy(tags + offset, c, 3); +// +// offset += pSchema[2].bytes; +// *(int64_t *) (tags + offset) = d; +// +// offset += pSchema[3].bytes; +// *(int16_t *) (tags + offset) = e; +// +// offset += pSchema[4].bytes; +// *(int8_t *) (tags + offset) = f; +// +// offset += pSchema[5].bytes; +// *(float *) (tags + offset) = g; +// +// offset += pSchema[6].bytes; +// *(int8_t *) (tags + offset) = h ? 1 : 0; +// +// tSkipListKey pKey = tSkipListCreateKey(pSchema[0].type, tags, pSchema[0].bytes); +// tSkipListPut(pSkipList, pMeter, &pKey, 1); +//} +// +// static void addOneNode_binary(SSchema *pSchema, int32_t tagsLen, tSkipList *pSkipList, +// char *meterId, int32_t a, double b, char *c, int64_t d, int16_t e, int8_t f, float g, +// bool h, int32_t numOfTags) { +// STabObj *pMeter = calloc(1, sizeof(STabObj)); +// pMeter->numOfTags = numOfTags; +// pMeter->pTagData = calloc(1, tagsLen + TSDB_METER_ID_LEN); +// strcpy(pMeter->meterId, meterId); +// +// char *tags = pMeter->pTagData + TSDB_METER_ID_LEN; +// int32_t offset = 0; +// memcpy(tags, c, pSchema[0].bytes); +// +// offset += pSchema[0].bytes; +// *(double *) (tags + offset) = b; +// +// offset += pSchema[1].bytes; +// *(int32_t *) (tags + offset) = a; +// +// offset += pSchema[2].bytes; +// *(int64_t *) (tags + offset) = d; +// +// offset += pSchema[3].bytes; +// *(int16_t *) (tags + offset) = e; +// +// offset += pSchema[4].bytes; +// *(int8_t *) (tags + offset) = f; +// +// offset += pSchema[5].bytes; +// *(float *) (tags + offset) = g; +// +// offset += pSchema[6].bytes; +// *(int8_t *) (tags + offset) = h ? 1 : 0; +// +// tSkipListKey pKey = tSkipListCreateKey(pSchema[0].type, tags, pSchema[0].bytes); +// tSkipListPut(pSkipList, pMeter, &pKey, 1); +// tSkipListDestroyKey(&pKey); +//} + +// static void dropMeter(tSkipList *pSkipList) { +// tSkipListNode **pRes = NULL; +// int32_t num = tSkipListIterateList(pSkipList, &pRes, NULL, NULL); +// for (int32_t i = 0; i < num; ++i) { +// tSkipListNode *pNode = pRes[i]; +// STabObj *pMeter = (STabObj *) pNode->pData; +// free(pMeter->pTagData); +// free(pMeter); +// pNode->pData = NULL; +// } +// free(pRes); +//} + +// static tSkipList *createSkipList(SSchema *pSchema, int32_t numOfTags) { +// int32_t tagsLen = 0; +// for (int32_t i = 0; i < numOfTags; ++i) { +// tagsLen += pSchema[i].bytes; +// } +// +// tSkipList *pSkipList = tSkipListCreate(10, pSchema[0].type, 4); +// +// addOneNode(pSchema, tagsLen, pSkipList, "tm0\0", 0, 10.5, "abc", 1000, -10000, -20, 1.0, true, 8); +// addOneNode(pSchema, tagsLen, pSkipList, "tm1\0", 1, 20.5, "def", 1100, -10500, -30, 2.0, false, 8); +// addOneNode(pSchema, tagsLen, pSkipList, "tm2\0", 2, 30.5, "ghi", 1200, -11000, -40, 3.0, true, 8); +// addOneNode(pSchema, tagsLen, pSkipList, "tm3\0", 3, 40.5, "jkl", 1300, -11500, -50, 4.0, false, 8); +// addOneNode(pSchema, tagsLen, pSkipList, "tm4\0", 4, 50.5, "mno", 1400, -12000, -60, 5.0, true, 8); +// addOneNode(pSchema, tagsLen, pSkipList, "tm5\0", 5, 60.5, "pqr", 1500, -12500, -70, 6.0, false, 8); +// addOneNode(pSchema, tagsLen, pSkipList, "tm6\0", 6, 70.5, "stu", 1600, -13000, -80, 7.0, true, 8); +// +// return pSkipList; +//} +// +// static tSkipList *createSkipList_binary(SSchema *pSchema, int32_t numOfTags) { +// int32_t tagsLen = 0; +// for (int32_t i = 0; i < numOfTags; ++i) { +// tagsLen += pSchema[i].bytes; +// } +// +// tSkipList *pSkipList = tSkipListCreate(10, pSchema[0].type, 4); +// +// addOneNode_binary(pSchema, tagsLen, pSkipList, "tm0\0", 0, 10.5, "abc", 1000, -10000, -20, 1.0, true, 8); +// addOneNode_binary(pSchema, tagsLen, pSkipList, "tm1\0", 1, 20.5, "def", 1100, -10500, -30, 2.0, false, 8); +// addOneNode_binary(pSchema, tagsLen, pSkipList, "tm2\0", 2, 30.5, "ghi", 1200, -11000, -40, 3.0, true, 8); +// addOneNode_binary(pSchema, tagsLen, pSkipList, "tm3\0", 3, 40.5, "jkl", 1300, -11500, -50, 4.0, false, 8); +// addOneNode_binary(pSchema, tagsLen, pSkipList, "tm4\0", 4, 50.5, "mno", 1400, -12000, -60, 5.0, true, 8); +// addOneNode_binary(pSchema, tagsLen, pSkipList, "tm5\0", 5, 60.5, "pqr", 1500, -12500, -70, 6.0, false, 8); +// addOneNode_binary(pSchema, tagsLen, pSkipList, "tm6\0", 6, 70.5, "stu", 1600, -13000, -80, 7.0, true, 8); +// +// return pSkipList; +//} + +static void testQueryStr(SSchema *schema, int32_t numOfCols, char *sql, tSkipList *pSkipList, ResultObj *pResult) { + tExprNode *pExpr = NULL; + tSQLBinaryExprFromString(&pExpr, schema, numOfCols, sql, strlen(sql)); + + char str[512] = {0}; + int32_t len = 0; + if (pExpr == NULL) { + printf("-----error in parse syntax:%s\n\n", sql); + assert(pResult == NULL); + return; + } + + tSQLBinaryExprToString(pExpr, str, &len); + printf("expr is: %s\n", str); + + SArray *result = NULL; + // tSQLBinaryExprTraverse(pExpr, pSkipList, result, tSkipListNodeFilterCallback, &result); + // printf("the result is:%lld\n", result.num); + // + // bool findResult = false; + // for (int32_t i = 0; i < result.num; ++i) { + // STabObj *pm = (STabObj *)result.pRes[i]; + // printf("meterid:%s,\t", pm->meterId); + // + // for (int32_t j = 0; j < pResult->numOfResult; ++j) { + // if (strcmp(pm->meterId, pResult->resultName[j]) == 0) { + // findResult = true; + // break; + // } + // } + // assert(findResult == true); + // findResult = false; + // } + + printf("\n\n"); + tExprTreeDestroy(&pExpr, NULL); +} + +static void Left2RightTest(SSchema *schema, int32_t numOfCols, tSkipList *pSkipList) { + char str[256] = {0}; + + char *t0[1] = {"tm0"}; + char *t1[1] = {"tm1"}; + char *sql = NULL; + + ResultObj res = {1, {"tm1"}}; + testQueryStr(schema, numOfCols, "a=1", pSkipList, &res); + + char *tt[1] = {"tm6"}; + setValue(&res, 1, tt); + testQueryStr(schema, numOfCols, "a>=6", pSkipList, &res); + + setValue(&res, 1, t0); + testQueryStr(schema, numOfCols, "b<=10.6", pSkipList, &res); + + strcpy(str, "c<>'pqr'"); + char *t2[6] = {"tm0", "tm1", "tm2", "tm3", "tm4", "tm6"}; + setValue(&res, 6, t2); + testQueryStr(schema, numOfCols, str, pSkipList, &res); + + strcpy(str, "c='abc'"); + setValue(&res, 1, t0); + testQueryStr(schema, numOfCols, str, pSkipList, &res); + + char *t3[6] = {"tm1", "tm2", "tm3", "tm4", "tm5", "tm6"}; + setValue(&res, 6, t3); + testQueryStr(schema, numOfCols, "d>1050", pSkipList, &res); + + char *t4[3] = {"tm4", "tm5", "tm6"}; + setValue(&res, 3, t4); + testQueryStr(schema, numOfCols, "g>4.5980765", pSkipList, &res); + + char *t5[4] = {"tm0", "tm2", "tm4", "tm6"}; + setValue(&res, 4, t5); + testQueryStr(schema, numOfCols, "h=true", pSkipList, &res); + + char *t6[3] = {"tm1", "tm3", "tm5"}; + setValue(&res, 3, t6); + testQueryStr(schema, numOfCols, "h=0", pSkipList, &res); + + sql = "(((b<40)))\0"; + char *t7[3] = {"tm0", "tm1", "tm2"}; + setValue(&res, 3, t7); + testQueryStr(schema, numOfCols, sql, pSkipList, &res); + + sql = "((a=1) or (a=10)) or ((b=12))"; + setValue(&res, 1, t1); + testQueryStr(schema, numOfCols, sql, pSkipList, &res); + + sql = "((((((a>0 and a<2))) or a=6) or a=3) or (b=50.5)) and h=0"; + char *t8[2] = {"tm1", "tm3"}; + setValue(&res, 2, t8); + testQueryStr(schema, numOfCols, sql, pSkipList, &res); + + char *tf[1] = {"tm6"}; + setValue(&res, 1, tf); + testQueryStr(schema, numOfCols, "e = -13000", pSkipList, &res); + + char *ft[5] = {"tm0", "tm1", "tm2", "tm3", "tm4"}; + setValue(&res, 5, ft); + testQueryStr(schema, numOfCols, "f > -65", pSkipList, &res); +} + +void Right2LeftTest(SSchema *schema, int32_t numOfCols, tSkipList *pSkipList) { + ResultObj res = {1, {"tm1"}}; + testQueryStr(schema, numOfCols, "((1=a))", pSkipList, &res); + + char *t9[2] = {"tm0", "tm1"}; + setValue(&res, 2, t9); + testQueryStr(schema, numOfCols, "1>=a", pSkipList, &res); + + char *t0[1] = {"tm0"}; + setValue(&res, 1, t0); + testQueryStr(schema, numOfCols, "10.6>=b", pSkipList, &res); + + char *t10[3] = {"tm1", "tm3", "tm5"}; + setValue(&res, 3, t10); + testQueryStr(schema, numOfCols, "0=h", pSkipList, &res); +} + +static void IllegalExprTest(SSchema *schema, int32_t numOfCols, tSkipList *pSkipList) { + testQueryStr(schema, numOfCols, "h=", pSkipList, NULL); + testQueryStr(schema, numOfCols, "h<", pSkipList, NULL); + testQueryStr(schema, numOfCols, "a=1 and ", pSkipList, NULL); + testQueryStr(schema, numOfCols, "and or", pSkipList, NULL); + testQueryStr(schema, numOfCols, "and a = 1 or", pSkipList, NULL); + testQueryStr(schema, numOfCols, "(())", pSkipList, NULL); + testQueryStr(schema, numOfCols, "(", pSkipList, NULL); + testQueryStr(schema, numOfCols, "(a", pSkipList, NULL); + testQueryStr(schema, numOfCols, "(a)", pSkipList, NULL); + testQueryStr(schema, numOfCols, "())", pSkipList, NULL); + testQueryStr(schema, numOfCols, "a===1", pSkipList, NULL); + testQueryStr(schema, numOfCols, "a=1 and ", pSkipList, NULL); +} + +static void Left2RightTest_binary(SSchema *schema, int32_t numOfCols, tSkipList *pSkipList) { + char str[256] = {0}; + char *sql = NULL; + + char *t0[1] = {"tm0"}; + char *t1[1] = {"tm1"}; + + ResultObj res = {1, {"tm0"}}; + strcpy(str, "a='abc'"); + testQueryStr(schema, numOfCols, str, pSkipList, &res); + + char *tt[1] = {"tm6"}; + setValue(&res, 1, tt); + testQueryStr(schema, numOfCols, "c>=6", pSkipList, &res); + + setValue(&res, 1, t0); + testQueryStr(schema, numOfCols, "b<=10.6", pSkipList, &res); + + strcpy(str, "a<>'pqr'"); + char *t2[6] = {"tm0", "tm1", "tm2", "tm3", "tm4", "tm6"}; + setValue(&res, 6, t2); + testQueryStr(schema, numOfCols, str, pSkipList, &res); + + strcpy(str, "a='abc'"); + setValue(&res, 1, t0); + testQueryStr(schema, numOfCols, str, pSkipList, &res); + + char *t3[6] = {"tm1", "tm2", "tm3", "tm4", "tm5", "tm6"}; + setValue(&res, 6, t3); + testQueryStr(schema, numOfCols, "d>1050", pSkipList, &res); + + char *t4[3] = {"tm4", "tm5", "tm6"}; + setValue(&res, 3, t4); + testQueryStr(schema, numOfCols, "g>4.5980765", pSkipList, &res); + + char *t5[4] = {"tm0", "tm2", "tm4", "tm6"}; + setValue(&res, 4, t5); + testQueryStr(schema, numOfCols, "h=true", pSkipList, &res); + + char *t6[3] = {"tm1", "tm3", "tm5"}; + setValue(&res, 3, t6); + testQueryStr(schema, numOfCols, "h=0", pSkipList, &res); + + sql = "(((b<40)))\0"; + char *t7[3] = {"tm0", "tm1", "tm2"}; + setValue(&res, 3, t7); + testQueryStr(schema, numOfCols, sql, pSkipList, &res); + + sql = "((c=1) or (c=10)) or ((b=12))\0"; + setValue(&res, 1, t1); + testQueryStr(schema, numOfCols, sql, pSkipList, &res); + + sql = "((((((c>0 and c<2))) or c=6) or c=3) or (b=50.5)) and h=false\0"; + char *t8[2] = {"tm1", "tm3"}; + setValue(&res, 2, t8); + testQueryStr(schema, numOfCols, sql, pSkipList, &res); +} + +static void Right2LeftTest_binary(SSchema *schema, int32_t numOfCols, tSkipList *pSkipList) { + char str[256] = {0}; + char *sql = NULL; + + char *t0[1] = {"tm0"}; + char *t1[1] = {"tm1"}; + + ResultObj res = {1, {"tm0"}}; + strcpy(str, "'abc'=a"); + testQueryStr(schema, numOfCols, str, pSkipList, &res); + + char *tt[1] = {"tm6"}; + setValue(&res, 1, tt); + testQueryStr(schema, numOfCols, "6<=c", pSkipList, &res); + + setValue(&res, 1, t0); + testQueryStr(schema, numOfCols, "10.6>=b", pSkipList, &res); + + strcpy(str, "'pqr'<>a"); + char *t2[6] = {"tm0", "tm1", "tm2", "tm3", "tm4", "tm6"}; + setValue(&res, 6, t2); + testQueryStr(schema, numOfCols, str, pSkipList, &res); +} + +namespace { +// two level expression tree +tExprNode *createExpr1() { + auto *pLeft = (tExprNode*) calloc(1, sizeof(tExprNode)); + pLeft->nodeType = TSQL_NODE_COL; + pLeft->pSchema = (SSchema*) calloc(1, sizeof(SSchema)); + + strcpy(pLeft->pSchema->name, "col_a"); + pLeft->pSchema->type = TSDB_DATA_TYPE_INT; + pLeft->pSchema->bytes = sizeof(int32_t); + pLeft->pSchema->colId = 1; + + auto *pRight = (tExprNode*) calloc(1, sizeof(tExprNode)); + pRight->nodeType = TSQL_NODE_VALUE; + pRight->pVal = (tVariant*) calloc(1, sizeof(tVariant)); + + pRight->pVal->nType = TSDB_DATA_TYPE_INT; + pRight->pVal->i64Key = 12; + + auto *pRoot = (tExprNode*) calloc(1, sizeof(tExprNode)); + pRoot->nodeType = TSQL_NODE_EXPR; + + pRoot->_node.optr = TSDB_RELATION_EQUAL; + pRoot->_node.pLeft = pLeft; + pRoot->_node.pRight = pRight; + pRoot->_node.hasPK = true; + + return pRoot; +} + +// thress level expression tree +tExprNode* createExpr2() { + auto *pLeft2 = (tExprNode*) calloc(1, sizeof(tExprNode)); + pLeft2->nodeType = TSQL_NODE_COL; + pLeft2->pSchema = (SSchema*) calloc(1, sizeof(SSchema)); + + strcpy(pLeft2->pSchema->name, "col_a"); + pLeft2->pSchema->type = TSDB_DATA_TYPE_BINARY; + pLeft2->pSchema->bytes = 20; + pLeft2->pSchema->colId = 1; + + auto *pRight2 = (tExprNode*) calloc(1, sizeof(tExprNode)); + pRight2->nodeType = TSQL_NODE_VALUE; + pRight2->pVal = (tVariant*) calloc(1, sizeof(tVariant)); + + pRight2->pVal->nType = TSDB_DATA_TYPE_BINARY; + const char* v = "hello world!"; + pRight2->pVal->pz = strdup(v); + pRight2->pVal->nLen = strlen(v); + + auto *p1 = (tExprNode*) calloc(1, sizeof(tExprNode)); + p1->nodeType = TSQL_NODE_EXPR; + + p1->_node.optr = TSDB_RELATION_LIKE; + p1->_node.pLeft = pLeft2; + p1->_node.pRight = pRight2; + p1->_node.hasPK = false; + + auto *pLeft1 = (tExprNode*) calloc(1, sizeof(tExprNode)); + pLeft1->nodeType = TSQL_NODE_COL; + pLeft1->pSchema = (SSchema*) calloc(1, sizeof(SSchema)); + + strcpy(pLeft1->pSchema->name, "col_b"); + pLeft1->pSchema->type = TSDB_DATA_TYPE_DOUBLE; + pLeft1->pSchema->bytes = 8; + pLeft1->pSchema->colId = 99; + + auto *pRight1 = (tExprNode*) calloc(1, sizeof(tExprNode)); + pRight1->nodeType = TSQL_NODE_VALUE; + pRight1->pVal = (tVariant*) calloc(1, sizeof(tVariant)); + + pRight1->pVal->nType = TSDB_DATA_TYPE_DOUBLE; + pRight1->pVal->dKey = 91.99; + + auto *p2 = (tExprNode*) calloc(1, sizeof(tExprNode)); + p2->nodeType = TSQL_NODE_EXPR; + + p2->_node.optr = TSDB_RELATION_LARGE_EQUAL; + p2->_node.pLeft = pLeft1; + p2->_node.pRight = pRight1; + p2->_node.hasPK = false; + + auto *pRoot = (tExprNode*) calloc(1, sizeof(tExprNode)); + pRoot->nodeType = TSQL_NODE_EXPR; + + pRoot->_node.optr = TSDB_RELATION_OR; + pRoot->_node.pLeft = p1; + pRoot->_node.pRight = p2; + pRoot->_node.hasPK = true; + return pRoot; +} + +void exprSerializeTest1() { + tExprNode* p1 = createExpr1(); + SBuffer buf = exprTreeToBinary(p1); + + size_t size = tbufTell(&buf); + ASSERT_TRUE(size > 0); + char* b = tbufGetData(&buf, false); + + tExprNode* p2 = exprTreeFromBinary(b, size); + ASSERT_EQ(p1->nodeType, p2->nodeType); + + ASSERT_EQ(p2->_node.optr, p1->_node.optr); + ASSERT_EQ(p2->_node.pLeft->nodeType, p1->_node.pLeft->nodeType); + ASSERT_EQ(p2->_node.pRight->nodeType, p1->_node.pRight->nodeType); + + SSchema* s1 = p1->_node.pLeft->pSchema; + SSchema* s2 = p2->_node.pLeft->pSchema; + + ASSERT_EQ(s2->colId, s1->colId); + ASSERT_EQ(s2->type, s1->type); + ASSERT_EQ(s2->bytes, s1->bytes); + ASSERT_STRCASEEQ(s2->name, s1->name); + + tVariant* v1 = p1->_node.pRight->pVal; + tVariant* v2 = p2->_node.pRight->pVal; + + ASSERT_EQ(v1->nType, v2->nType); + ASSERT_EQ(v1->i64Key, v2->i64Key); + ASSERT_EQ(p1->_node.hasPK, p2->_node.hasPK); + + tExprTreeDestroy(&p1, nullptr); + tExprTreeDestroy(&p2, nullptr); + + tbufClose(&buf, false); +} + +void exprSerializeTest2() { + tExprNode* p1 = createExpr2(); + SBuffer buf = exprTreeToBinary(p1); + + size_t size = tbufTell(&buf); + ASSERT_TRUE(size > 0); + char* b = tbufGetData(&buf, false); + + tExprNode* p2 = exprTreeFromBinary(b, size); + ASSERT_EQ(p1->nodeType, p2->nodeType); + + ASSERT_EQ(p2->_node.optr, p1->_node.optr); + ASSERT_EQ(p2->_node.pLeft->nodeType, p1->_node.pLeft->nodeType); + ASSERT_EQ(p2->_node.pRight->nodeType, p1->_node.pRight->nodeType); + + tExprNode* c1Left = p1->_node.pLeft; + tExprNode* c2Left = p2->_node.pLeft; + + ASSERT_EQ(c1Left->nodeType, c2Left->nodeType); + + ASSERT_EQ(c2Left->nodeType, TSQL_NODE_EXPR); + ASSERT_EQ(c2Left->_node.optr, TSDB_RELATION_LIKE); + + ASSERT_STRCASEEQ(c2Left->_node.pLeft->pSchema->name, "col_a"); + ASSERT_EQ(c2Left->_node.pRight->nodeType, TSQL_NODE_VALUE); + + ASSERT_STRCASEEQ(c2Left->_node.pRight->pVal->pz, "hello world!"); + + tExprNode* c1Right = p1->_node.pRight; + tExprNode* c2Right = p2->_node.pRight; + + ASSERT_EQ(c1Right->nodeType, c2Right->nodeType); + ASSERT_EQ(c2Right->nodeType, TSQL_NODE_EXPR); + ASSERT_EQ(c2Right->_node.optr, TSDB_RELATION_LARGE_EQUAL); + ASSERT_EQ(c2Right->_node.pRight->pVal->dKey, 91.99); + + ASSERT_EQ(p2->_node.hasPK, true); + + tExprTreeDestroy(&p1, nullptr); + tExprTreeDestroy(&p2, nullptr); + + tbufClose(&buf, false); +} +} // namespace +TEST(testCase, astTest) { + exprSerializeTest2(); +} \ No newline at end of file diff --git a/src/util/inc/tbuffer.h b/src/util/inc/tbuffer.h index 9dc6d97eb6..d52031fc6a 100644 --- a/src/util/inc/tbuffer.h +++ b/src/util/inc/tbuffer.h @@ -13,14 +13,15 @@ * along with this program. If not, see . */ -#include -#include -#include -#include - #ifndef TDENGINE_TBUFFER_H #define TDENGINE_TBUFFER_H +#include "setjmp.h" +#include "os.h" + +#ifdef __cplusplus +extern "C" { +#endif /* SBuffer can be used to read or write a buffer, but cannot be used for both @@ -80,37 +81,33 @@ int main(int argc, char** argv) { */ typedef struct { jmp_buf jb; - char* data; - size_t pos; - size_t size; + char* data; + size_t pos; + size_t size; } SBuffer; - // common functions can be used in both read & write #define tbufThrowError(buf, code) longjmp((buf)->jb, (code)) size_t tbufTell(SBuffer* buf); size_t tbufSeekTo(SBuffer* buf, size_t pos); size_t tbufSkip(SBuffer* buf, size_t size); -void tbufClose(SBuffer* buf, bool keepData); - +void tbufClose(SBuffer* buf, bool keepData); // basic read functions -#define tbufBeginRead(buf, data, len) (((buf)->data = (char*)data), ((buf)->pos = 0), ((buf)->size = ((data) == NULL) ? 0 : (len)), setjmp((buf)->jb)) -char* tbufRead(SBuffer* buf, size_t size); -void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size); +#define tbufBeginRead(buf, _data, len) ((buf)->data = (char*)(_data), ((buf)->pos = 0), ((buf)->size = ((_data) == NULL) ? 0 : (len)), setjmp((buf)->jb)) +char* tbufRead(SBuffer* buf, size_t size); +void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size); const char* tbufReadString(SBuffer* buf, size_t* len); -size_t tbufReadToString(SBuffer* buf, char* dst, size_t size); - +size_t tbufReadToString(SBuffer* buf, char* dst, size_t size); // basic write functions #define tbufBeginWrite(buf) ((buf)->data = NULL, ((buf)->pos = 0), ((buf)->size = 0), setjmp((buf)->jb)) -void tbufEnsureCapacity(SBuffer* buf, size_t size); +void tbufEnsureCapacity(SBuffer* buf, size_t size); char* tbufGetData(SBuffer* buf, bool takeOver); -void tbufWrite(SBuffer* buf, const void* data, size_t size); -void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size); -void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len); -void tbufWriteString(SBuffer* buf, const char* str); - +void tbufWrite(SBuffer* buf, const void* data, size_t size); +void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size); +void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len); +void tbufWriteString(SBuffer* buf, const char* str); // read & write function for primitive types #ifndef TBUFFER_DEFINE_FUNCTION @@ -120,17 +117,21 @@ void tbufWriteString(SBuffer* buf, const char* str); void tbufWrite##name##At(SBuffer* buf, size_t pos, type data); #endif -TBUFFER_DEFINE_FUNCTION( bool, Bool ) -TBUFFER_DEFINE_FUNCTION( char, Char ) -TBUFFER_DEFINE_FUNCTION( int8_t, Int8 ) -TBUFFER_DEFINE_FUNCTION( uint8_t, Unt8 ) -TBUFFER_DEFINE_FUNCTION( int16_t, Int16 ) -TBUFFER_DEFINE_FUNCTION( uint16_t, Uint16 ) -TBUFFER_DEFINE_FUNCTION( int32_t, Int32 ) -TBUFFER_DEFINE_FUNCTION( uint32_t, Uint32 ) -TBUFFER_DEFINE_FUNCTION( int64_t, Int64 ) -TBUFFER_DEFINE_FUNCTION( uint64_t, Uint64 ) -TBUFFER_DEFINE_FUNCTION( float, Float ) -TBUFFER_DEFINE_FUNCTION( double, Double ) +TBUFFER_DEFINE_FUNCTION(bool, Bool) +TBUFFER_DEFINE_FUNCTION(char, Char) +TBUFFER_DEFINE_FUNCTION(int8_t, Int8) +TBUFFER_DEFINE_FUNCTION(uint8_t, Unt8) +TBUFFER_DEFINE_FUNCTION(int16_t, Int16) +TBUFFER_DEFINE_FUNCTION(uint16_t, Uint16) +TBUFFER_DEFINE_FUNCTION(int32_t, Int32) +TBUFFER_DEFINE_FUNCTION(uint32_t, Uint32) +TBUFFER_DEFINE_FUNCTION(int64_t, Int64) +TBUFFER_DEFINE_FUNCTION(uint64_t, Uint64) +TBUFFER_DEFINE_FUNCTION(float, Float) +TBUFFER_DEFINE_FUNCTION(double, Double) + +#ifdef __cplusplus +} +#endif #endif \ No newline at end of file diff --git a/src/util/src/tbuffer.c b/src/util/src/tbuffer.c index ac7d22078d..a83d7dddb0 100644 --- a/src/util/src/tbuffer.c +++ b/src/util/src/tbuffer.c @@ -30,7 +30,7 @@ tbufWriteAt(buf, pos, &data, sizeof(data));\ } -#include "../inc/tbuffer.h" +#include "tbuffer.h" //////////////////////////////////////////////////////////////////////////////// @@ -119,13 +119,14 @@ void tbufEnsureCapacity(SBuffer* buf, size_t size) { } char* tbufGetData(SBuffer* buf, bool takeOver) { - char* ret = buf->data; - if (takeOver) { - buf->pos = 0; - buf->size = 0; - buf->data = NULL; - } - return ret; + char* ret = buf->data; + if (takeOver) { + buf->pos = 0; + buf->size = 0; + buf->data = NULL; + } + + return ret; } void tbufEndWrite(SBuffer* buf) { diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index ff8da1cdad..bd6699eb84 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -147,6 +147,11 @@ void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direct } void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { + if (pIter->numOfFGroups == 0) { + assert(pIter->pFileGroup == NULL); + return; + } + int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE; void *ptr = taosbsearch(&fid, pIter->base, sizeof(SFileGroup), pIter->numOfFGroups, compFGroupKey, flags); if (ptr == NULL) { diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 5635d9a98f..599d8bd347 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -424,6 +424,28 @@ static SDataBlockInfo getTrueDataBlockInfo(STsdbQueryHandle* pHandle, STableChec SArray *getDefaultLoadColumns(STsdbQueryHandle *pQueryHandle, bool loadTS); static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCols, SArray *sa); +static bool doLoadDataFromFileBlock(STsdbQueryHandle *pQueryHandle) { + SQueryFilePos *cur = &pQueryHandle->cur; + + STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex); + SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot]; + + SCompData* data = calloc(1, sizeof(SCompData)+ sizeof(SCompCol)*pBlock->numOfCols); + + data->numOfCols = pBlock->numOfCols; + data->uid = pCheckInfo->pTableObj->tableId.uid; + + pCheckInfo->pDataCols = tdNewDataCols(1000, 2, 4096); + tdInitDataCols(pCheckInfo->pDataCols, pCheckInfo->pTableObj->schema); + + SFile* pFile = &pCheckInfo->pFileGroup->files[TSDB_FILE_TYPE_DATA]; + if (pFile->fd == FD_INITIALIZER) { + pFile->fd = open(pFile->fname, O_RDONLY); + } + + tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data); +} + static bool loadQualifiedDataFromFileBlock(STsdbQueryHandle *pQueryHandle) { SQueryFilePos *cur = &pQueryHandle->cur; @@ -432,31 +454,18 @@ static bool loadQualifiedDataFromFileBlock(STsdbQueryHandle *pQueryHandle) { SArray *sa = getDefaultLoadColumns(pQueryHandle, true); if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) { + + // query ended in current block if (pQueryHandle->window.ekey < pBlock->keyLast) { - SCompData* data = calloc(1, sizeof(SCompData)+ sizeof(SCompCol)*pBlock->numOfCols); - - data->numOfCols = pBlock->numOfCols; - data->uid = pCheckInfo->pTableObj->tableId.uid; - - pCheckInfo->pDataCols = tdNewDataCols(1000, 2, 4096); - tdInitDataCols(pCheckInfo->pDataCols, pCheckInfo->pTableObj->schema); - - SFile* pFile = &pCheckInfo->pFileGroup->files[TSDB_FILE_TYPE_DATA]; - if (pFile->fd == FD_INITIALIZER) { - pFile->fd = open(pFile->fname, O_RDONLY); - } - - if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) { - //do something - } + doLoadDataFromFileBlock(pQueryHandle); + filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa); } - } else { + } else {// todo desc query if (pQueryHandle->window.ekey > pBlock->keyFirst) { -// loadDataBlockIntoMem_(pQueryHandle, pBlock, &pQueryHandle->pFields[cur->slot], cur->fileId, sa); +// } } - filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa); return pQueryHandle->realNumOfRows > 0; } @@ -508,7 +517,7 @@ bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) { // next block in the same file cur->slot += step; - SCompBlock* pBlock = &pQueryHandle->pBlock[cur->slot]; + SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot]; cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pBlock->numOfPoints - 1; return loadQualifiedDataFromFileBlock(pQueryHandle); } @@ -736,12 +745,23 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf int32_t index = -1; int32_t tid = pCheckInfo->tableId.tid; - SFile* pFile = &pCheckInfo->pFileGroup->files[TSDB_FILE_TYPE_DATA]; - while (1) { + while (pCheckInfo->pFileGroup != NULL) { if ((fid = getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup)) < 0) { break; } + + SFile* pFile = &pCheckInfo->pFileGroup->files[TSDB_FILE_TYPE_DATA]; + + // no data block in current file, try next + if (pCheckInfo->compIndex[tid].numOfSuperBlocks == 0) { + dTrace("QInfo:%p no data block in file, fid:%d, tid:%d, try next", pQueryHandle->qinfo, + pCheckInfo->pFileGroup->fileId, tid); + + pCheckInfo->pFileGroup = tsdbGetFileGroupNext(&pCheckInfo->fileIter); + + continue; + } index = binarySearchForBlockImpl(pCheckInfo->pCompInfo->blocks, pCheckInfo->compIndex[tid].numOfSuperBlocks, pQueryHandle->order, key); @@ -769,12 +789,11 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf // load first data block into memory failed, caused by disk block error bool blockLoaded = false; - SArray *sa = NULL; + SArray *sa = getDefaultLoadColumns(pQueryHandle, true); // todo no need to loaded at all cur->slot = index; - sa = getDefaultLoadColumns(pQueryHandle, true); SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot]; SCompData* data = calloc(1, sizeof(SCompData)+ sizeof(SCompCol)*pBlock->numOfCols); @@ -784,6 +803,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf pCheckInfo->pDataCols = tdNewDataCols(1000, 2, 4096); tdInitDataCols(pCheckInfo->pDataCols, pCheckInfo->pTableObj->schema); + SFile* pFile = &pCheckInfo->pFileGroup->files[TSDB_FILE_TYPE_DATA]; if (pFile->fd == FD_INITIALIZER) { pFile->fd = open(pFile->fname, O_RDONLY); } @@ -973,11 +993,16 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); SDataBlockInfo binfo = getTrueDataBlockInfo(pHandle, pCheckInfo); - if (pHandle->realNumOfRows <= binfo.size) { + assert(pHandle->realNumOfRows <= binfo.size); + + if (pHandle->realNumOfRows < binfo.size) { return pHandle->pColumns; } else { - // todo do load data block - assert(0); + SArray *sa = getDefaultLoadColumns(pHandle, true); + + doLoadDataFromFileBlock(pHandle); + filterDataInDataBlock(pHandle, pCheckInfo->pDataCols, sa); + return pHandle->pColumns; } } } @@ -1197,7 +1222,7 @@ static void getTagColumnInfo(SSyntaxTreeFilterSupporter* pSupporter, SSchema* pS } void filterPrepare(void* expr, void* param) { - tSQLSyntaxNode *pExpr = (tSQLSyntaxNode*) expr; + tExprNode *pExpr = (tExprNode*) expr; if (pExpr->_node.info != NULL) { return; } @@ -1276,7 +1301,7 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) { static int32_t doQueryTableList(STable* pSTable, SArray* pRes, const char* pCond) { STColumn* stcol = schemaColAt(pSTable->tagSchema, 0); - tSQLSyntaxNode* pExpr = NULL; + tExprNode* pExpr = NULL; tSQLBinaryExprFromString(&pExpr, stcol, schemaNCols(pSTable->tagSchema), pCond, strlen(pCond)); // failed to build expression, no result, return immediately @@ -1297,7 +1322,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, const char* pCond }; tSQLBinaryExprTraverse(pExpr, pSTable->pIndex, pRes, &supp); - tSQLBinaryExprDestroy(&pExpr, tSQLListTraverseDestroyInfo); + tExprTreeDestroy(&pExpr, tSQLListTraverseDestroyInfo); tansformQueryResult(pRes);