diff --git a/Jenkinsfile2 b/Jenkinsfile2 index 98d7a5a731..bc309ff66c 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -60,7 +60,7 @@ def check_docs() { def file_changed = sh ( script: ''' cd ${WKC} - git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${CHANGE_TARGET}`|grep -v "^docs/en/"|grep -v "^docs/zh/" + git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${CHANGE_TARGET}`|grep -v "^docs/en/"|grep -v "^docs/zh/" || : ''', returnStdout: true ).trim() diff --git a/docs/examples/python/subscribe_demo.py b/docs/examples/python/subscribe_demo.py deleted file mode 100644 index db9d49c3f4..0000000000 --- a/docs/examples/python/subscribe_demo.py +++ /dev/null @@ -1,38 +0,0 @@ -""" -Python asynchronous subscribe demo. -run on Linux system with: python3 subscribe_demo.py -""" - -from ctypes import c_void_p - -import taos -import time - - -def query_callback(p_sub, p_result, p_param, code): - """ - :param p_sub: pointer returned by native API -- taos_subscribe - :param p_result: pointer to native TAOS_RES - :param p_param: None - :param code: error code - :return: None - """ - print("in callback") - result = taos.TaosResult(c_void_p(p_result)) - # raise exception if error occur - result.check_error(code) - for row in result.rows_iter(): - print(row) - print(f"{result.row_count} rows consumed.") - - -if __name__ == '__main__': - conn = taos.connect() - restart = True - topic = "topic-meter-current-bg" - sql = "select * from power.meters where current > 10" # Error sql - interval = 2000 # consumption interval in microseconds. - _ = conn.subscribe(restart, topic, sql, interval, query_callback) - # Note: we received the return value as _ above, to avoid the TaosSubscription object to be deleted by gc. - while True: - time.sleep(10) # use Ctrl + C to interrupt diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 52c73f85f5..4da4747108 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -115,6 +115,7 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pListInfo); int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId); +int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo); size_t getTableTagsBufLen(const SNodeList* pGroups); SArray* createSortInfo(SNodeList* pNodeList); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index f3b395cc7c..b683723575 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -428,10 +428,6 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray // int64_t st = taosGetTimestampUs(); for (int32_t i = 0; i < rows; i++) { int64_t* uid = taosArrayGet(uidList, i); - void* tag = taosHashGet(tags, uid, sizeof(int64_t)); - if (suid != 0) { - ASSERT(tag); - } for(int32_t j = 0; j < taosArrayGetSize(pResBlock->pDataBlock); j++){ SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j); @@ -443,6 +439,8 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray qDebug("tagfilter uid:%ld, tbname:%s", *uid, str+2); #endif }else{ + void* tag = taosHashGet(tags, uid, sizeof(int64_t)); + ASSERT(tag); STagVal tagVal = {0}; tagVal.cid = pColInfo->info.colId; const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal); @@ -503,6 +501,241 @@ end: return output.columnData; } +static void releaseColInfoData(void* pCol) { + if(pCol){ + SColumnInfoData* col = (SColumnInfoData*) pCol; + colDataDestroy(col); + taosMemoryFree(col); + } +} + +int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo){ + int32_t code = TSDB_CODE_SUCCESS; + SArray *pBlockList = NULL; + SSDataBlock *pResBlock = NULL; + SHashObj *tags = NULL; + SArray *uidList = NULL; + void *keyBuf = NULL; + SArray *groupData = NULL; + + int32_t rows = taosArrayGetSize(pTableListInfo->pTableList); + if(rows == 0){ + return TDB_CODE_SUCCESS; + } + + tagFilterAssist ctx = {0}; + ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK); + if(ctx.colHash == NULL){ + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + ctx.index = 0; + ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo)); + if(ctx.cInfoList == NULL){ + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + + SNode* pNode = NULL; + FOREACH(pNode, group) { + nodesRewriteExprPostOrder(&pNode, getColumn, (void *)&ctx); + REPLACE_NODE(pNode); + } + + pResBlock = createDataBlock(); + if (pResBlock == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + + for (int32_t i = 0; i < taosArrayGetSize(ctx.cInfoList); ++i) { + SColumnInfoData colInfo = {{0}, 0}; + colInfo.info = *(SColumnInfo*)taosArrayGet(ctx.cInfoList, i); + blockDataAppendColInfo(pResBlock, &colInfo); + } + + uidList = taosArrayInit(rows, sizeof(uint64_t)); + for (int32_t i = 0; i < rows; ++i) { + STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i); + taosArrayPush(uidList, &pkeyInfo->uid); + } + +// int64_t stt = taosGetTimestampUs(); + tags = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + code = metaGetTableTags(metaHandle, pTableListInfo->suid, uidList, tags); + if (code != TSDB_CODE_SUCCESS) { + goto end; + } + +// int64_t stt1 = taosGetTimestampUs(); +// qDebug("generate tag meta rows:%d, cost:%ld us", rows, stt1-stt); + + code = blockDataEnsureCapacity(pResBlock, rows); + if (code != TSDB_CODE_SUCCESS) { + goto end; + } + +// int64_t st = taosGetTimestampUs(); + for (int32_t i = 0; i < rows; i++) { + int64_t* uid = taosArrayGet(uidList, i); + for(int32_t j = 0; j < taosArrayGetSize(pResBlock->pDataBlock); j++){ + SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j); + + if(pColInfo->info.colId == -1){ // tbname + char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + metaGetTableNameByUid(metaHandle, *uid, str); + colDataAppend(pColInfo, i, str, false); +#if TAG_FILTER_DEBUG + qDebug("tagfilter uid:%ld, tbname:%s", *uid, str+2); +#endif + }else{ + void* tag = taosHashGet(tags, uid, sizeof(int64_t)); + ASSERT(tag); + STagVal tagVal = {0}; + tagVal.cid = pColInfo->info.colId; + const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal); + + if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)){ + colDataAppend(pColInfo, i, p, true); + } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) { + colDataAppend(pColInfo, i, p, false); + } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { + char *tmp = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1); + varDataSetLen(tmp, tagVal.nData); + memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData); + colDataAppend(pColInfo, i, tmp, false); +#if TAG_FILTER_DEBUG + qDebug("tagfilter varch:%s", tmp+2); +#endif + taosMemoryFree(tmp); + } else { + colDataAppend(pColInfo, i, (const char*)&tagVal.i64, false); +#if TAG_FILTER_DEBUG + if(pColInfo->info.type == TSDB_DATA_TYPE_INT){ + qDebug("tagfilter int:%d", *(int*)(&tagVal.i64)); + }else if(pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE){ + qDebug("tagfilter double:%f", *(double *)(&tagVal.i64)); + } +#endif + } + } + } + } + pResBlock->info.rows = rows; + +// int64_t st1 = taosGetTimestampUs(); +// qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st); + + pBlockList = taosArrayInit(2, POINTER_BYTES); + taosArrayPush(pBlockList, &pResBlock); + + groupData = taosArrayInit(2, POINTER_BYTES); + FOREACH(pNode, group) { + SScalarParam output = {0}; + + switch (nodeType(pNode)) { + case QUERY_NODE_VALUE: + break; + case QUERY_NODE_COLUMN: + case QUERY_NODE_OPERATOR: + case QUERY_NODE_FUNCTION:{ + SExprNode* expNode = (SExprNode*)pNode; + code = createResultData(&expNode->resType, rows, &output); + if (code != TSDB_CODE_SUCCESS) { + goto end; + } + break; + } + default: + code = TSDB_CODE_OPS_NOT_SUPPORT; + goto end; + } + if(nodeType(pNode) == QUERY_NODE_COLUMN){ + SColumnNode* pSColumnNode = (SColumnNode*)pNode; + SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId); + code = colDataAssign(output.columnData, pColInfo, rows, NULL); + }else if(nodeType(pNode) == QUERY_NODE_VALUE){ + continue; + }else{ + code = scalarCalculate(pNode, pBlockList, &output); + } + if(code != TSDB_CODE_SUCCESS){ + releaseColInfoData(output.columnData); + goto end; + } + taosArrayPush(groupData, &output.columnData); + } + + int32_t keyLen = 0; + SNode* node; + FOREACH(node, group) { + SExprNode* pExpr = (SExprNode*)node; + keyLen += pExpr->resType.bytes; + } + + int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group); + keyLen += nullFlagSize; + + keyBuf = taosMemoryCalloc(1, keyLen); + if (keyBuf == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + for(int i = 0; i < rows; i++){ + STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); + + char* isNull = (char*)keyBuf; + char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(group); + for(int j = 0; j < taosArrayGetSize(groupData); j++){ + SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j); + + if (colDataIsNull_s(pValue, i)) { + isNull[j] = 1; + } else { + isNull[j] = 0; + char* data = colDataGetData(pValue, i); + if (pValue->info.type == TSDB_DATA_TYPE_JSON) { + if (tTagIsJson(data)) { + code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR; + goto end; + } + if(tTagIsJsonNull(data)){ + isNull[j] = 1; + continue; + } + int32_t len = getJsonValueLen(data); + memcpy(pStart, data, len); + pStart += len; + } else if (IS_VAR_DATA_TYPE(pValue->info.type)) { + memcpy(pStart, data, varDataTLen(data)); + pStart += varDataTLen(data); + } else { + memcpy(pStart, data, pValue->info.bytes); + pStart += pValue->info.bytes; + } + } + } + + int32_t len = (int32_t)(pStart - (char*)keyBuf); + info->groupId = calcGroupId(keyBuf, len); + taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); + } + +// int64_t st2 = taosGetTimestampUs(); +// qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1); + + end: + taosMemoryFreeClear(keyBuf); + taosHashCleanup(tags); + taosHashCleanup(ctx.colHash); + taosArrayDestroy(ctx.cInfoList); + blockDataDestroy(pResBlock); + taosArrayDestroy(pBlockList); + taosArrayDestroy(uidList); + taosArrayDestroyP(groupData, releaseColInfoData); + return code; +} + int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pListInfo) { int32_t code = TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 16a1cb898f..fc382b2e04 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3894,9 +3894,9 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { tDeleteSSchemaWrapper(pSchemaInfo->qsw); } -static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) { +static int32_t sortTableGroup(STableListInfo* pTableListInfo) { taosArrayClear(pTableListInfo->pGroupList); - SArray* sortSupport = taosArrayInit(groupNum, sizeof(uint64_t)); + SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t)); if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY; for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); @@ -3974,48 +3974,26 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, if (pTableListInfo->map == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - int32_t keyLen = 0; - void* keyBuf = NULL; - - SNode* node; - FOREACH(node, group) { - SExprNode* pExpr = (SExprNode*)node; - keyLen += pExpr->resType.bytes; - } - - int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group); - keyLen += nullFlagSize; - - keyBuf = taosMemoryCalloc(1, keyLen); - if (keyBuf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } bool assignUid = groupbyTbname(group); - int32_t groupNum = 0; size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); - for (int32_t i = 0; i < numOfTables; i++) { - STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - - if (assignUid) { + if(assignUid){ + for (int32_t i = 0; i < numOfTables; i++) { + STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); info->groupId = info->uid; - } else { - int32_t code = getGroupIdFromTagsVal(pHandle->meta, info->uid, group, keyBuf, &info->groupId); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); + } + }else{ + int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; } - - taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); - groupNum++; } - taosMemoryFree(keyBuf); - if (pTableListInfo->needSortTableByGroupId) { - return sortTableGroup(pTableListInfo, groupNum); + return sortTableGroup(pTableListInfo); } return TDB_CODE_SUCCESS; diff --git a/tools/shell/inc/shellInt.h b/tools/shell/inc/shellInt.h index 358377f804..26ca6895ac 100644 --- a/tools/shell/inc/shellInt.h +++ b/tools/shell/inc/shellInt.h @@ -95,6 +95,7 @@ typedef struct { TAOS* conn; TdThread pid; tsem_t cancelSem; + bool exit; #ifdef WEBSOCKET WS_TAOS* ws_conn; bool stop_query; diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 724ac8fbfd..68e3a272c3 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -948,6 +948,10 @@ void shellCleanup(void *arg) { taosResetTerminalMode(); } void *shellCancelHandler(void *arg) { setThreadName("shellCancelHandler"); while (1) { + if (shell.exit == true) { + break; + } + if (tsem_wait(&shell.cancelSem) != 0) { taosMsleep(10); continue; @@ -961,7 +965,7 @@ void *shellCancelHandler(void *arg) { taos_kill_query(shell.conn); #ifdef WEBSOCKET } -#endif +#endif #ifdef WINDOWS printf("\n%s", shell.info.promptHeader); #endif @@ -1009,7 +1013,7 @@ int32_t shellExecute() { if (shell.args.restful || shell.args.cloud) { if (shell_conn_ws_server(1)) { return -1; - } + } } else { #endif if (shell.args.auth == NULL) { @@ -1043,7 +1047,7 @@ int32_t shellExecute() { if (shell.args.restful || shell.args.cloud) { ws_close(shell.ws_conn); } else { -#endif +#endif taos_close(shell.conn); #ifdef WEBSOCKET } @@ -1079,7 +1083,12 @@ int32_t shellExecute() { taosThreadCreate(&shell.pid, NULL, shellThreadLoop, NULL); taosThreadJoin(shell.pid, NULL); taosThreadClear(&shell.pid); + if (shell.exit) { + tsem_post(&shell.cancelSem); + break; + } } + taosThreadJoin(spid, NULL); shellCleanupHistory(); return 0; diff --git a/tools/shell/src/shellMain.c b/tools/shell/src/shellMain.c index 703533f8a9..964082f3c3 100644 --- a/tools/shell/src/shellMain.c +++ b/tools/shell/src/shellMain.c @@ -19,6 +19,7 @@ SShellObj shell = {0}; int main(int argc, char *argv[]) { + shell.exit = false; #ifdef WEBSOCKET shell.args.timeout = 10; shell.args.cloud = true; @@ -46,7 +47,7 @@ int main(int argc, char *argv[]) { shellPrintHelp(); return 0; } -#ifdef WEBSOCKET +#ifdef WEBSOCKET shellCheckConnectMode(); #endif taos_init(); diff --git a/tools/shell/src/shellUtil.c b/tools/shell/src/shellUtil.c index e5e61e0b24..0430428c38 100644 --- a/tools/shell/src/shellUtil.c +++ b/tools/shell/src/shellUtil.c @@ -157,6 +157,6 @@ void shellExit() { taos_close(shell.conn); shell.conn = NULL; } + shell.exit = true; taos_cleanup(); - exit(EXIT_FAILURE); }