diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index bd956aeb74..90364987bb 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -349,7 +349,7 @@ typedef struct SSqlStream { int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn); void tscInitMsgsFp(); -int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); +int tsParseSql(SSqlObj *pSql, bool initialParse); void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet); int tscProcessSql(SSqlObj *pSql); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 3c54176d0a..2b99d23099 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -39,41 +39,26 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows); -int doAsyncParseSql(SSqlObj* pSql) { - SSqlCmd* pCmd = &pSql->cmd; - SSqlRes* pRes = &pSql->res; - - int32_t code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); - if (code != TSDB_CODE_SUCCESS) { - tscError("failed to malloc payload"); - pSql->res.code = code; - - tscQueueAsyncRes(pSql); - return code; - } - - pRes->qhandle = 0; - pRes->numOfRows = 1; - - tscDump("%p SQL: %s", pSql, pSql->sqlstr); - return tsParseSql(pSql, true); -} - void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) { pSql->signature = pSql; pSql->param = param; pSql->pTscObj = pObj; pSql->maxRetry = TSDB_MAX_REPLICA_NUM; pSql->fp = fp; + pSql->sqlstr = calloc(1, sqlLen + 1); if (pSql->sqlstr == NULL) { tscError("%p failed to malloc sql string buffer", pSql); tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY); return; } + strtolower(pSql->sqlstr, sqlstr); - - int32_t code = doAsyncParseSql(pSql); + + tscDump("%p SQL: %s", pSql, pSql->sqlstr); + pSql->cmd.curSql = pSql->sqlstr; + + int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; if (code != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 4b6178eb61..b03972bbfa 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1014,42 +1014,37 @@ static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) { * @param pSql * @return */ -int doParseInsertSql(SSqlObj *pSql, char *str) { +int tsParseInsertSql(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; + char* str = pCmd->curSql; int32_t totalNum = 0; int32_t code = TSDB_CODE_SUCCESS; - STableMetaInfo *pTableMetaInfo = NULL; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); assert(pQueryInfo != NULL); + STableMetaInfo *pTableMetaInfo = NULL; if (pQueryInfo->numOfTables == 0) { pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo); } else { pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); } - // TODO: 2048 is added because TSDB_MAX_TAGS_LEN now is 65536 - // but TSDB_PAYLOAD_SIZE is 65380 + // TODO: 2048 is added because TSDB_MAX_TAGS_LEN now is 65536, but TSDB_PAYLOAD_SIZE is 65380 if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE + 2048)) != TSDB_CODE_SUCCESS) { return code; } - assert(((NULL == pCmd->curSql) && (NULL == pCmd->pTableList)) - || ((NULL != pCmd->curSql) && (NULL != pCmd->pTableList))); - - if ((NULL == pCmd->curSql) && (NULL == pCmd->pTableList)) { + if (NULL == pCmd->pTableList) { pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); pSql->cmd.pDataBlocks = tscCreateBlockArrayList(); if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _error_clean; + goto _error; } } else { - assert((NULL != pCmd->curSql) && (NULL != pCmd->pTableList)); str = pCmd->curSql; } @@ -1075,7 +1070,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { */ if (totalNum == 0) { code = TSDB_CODE_TSC_INVALID_SQL; - goto _error_clean; + goto _error; } else { break; } @@ -1086,11 +1081,11 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { // Check if the table name available or not if (validateTableName(sToken.z, sToken.n) != TSDB_CODE_SUCCESS) { code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z); - goto _error_clean; + goto _error; } if ((code = tscSetTableFullName(pTableMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) { - goto _error_clean; + goto _error; } ptrdiff_t pos = pCmd->curSql - pSql->sqlstr; @@ -1103,20 +1098,19 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { * interrupted position. */ if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) { - tscTrace("%p waiting for get table meta during insert, then resume from offset: %" PRId64 " , %s", pSql, + tscTrace("%p waiting for get table meta during insert, then resume from offset: %" PRId64 ", %s", pSql, pos, pCmd->curSql); return code; } - // todo add to return - tscError("%p async insert parse error, code:%d, %s", pSql, code, tstrerror(code)); + tscError("%p async insert parse error, code:%d, reason:%s", pSql, code, tstrerror(code)); pCmd->curSql = NULL; - goto _error_clean; // TODO: should _clean or _error_clean to async flow ???? + goto _error; } if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL); - goto _error_clean; + goto _error; } index = 0; @@ -1125,7 +1119,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { if (sToken.n == 0) { code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z); - goto _error_clean; + goto _error; } STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); @@ -1137,7 +1131,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns); if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { - goto _error_clean; + goto _error; } /* @@ -1146,11 +1140,11 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { */ code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum); if (code != TSDB_CODE_SUCCESS) { - goto _error_clean; + goto _error; } } else if (sToken.type == TK_FILE) { if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) { - goto _error_clean; + goto _error; } index = 0; @@ -1158,7 +1152,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { str += index; if (sToken.n == 0) { code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z); - goto _error_clean; + goto _error; } char fname[PATH_MAX] = {0}; @@ -1168,7 +1162,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { wordexp_t full_path; if (wordexp(fname, &full_path, 0) != 0) { code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z); - goto _error_clean; + goto _error; } strcpy(fname, full_path.we_wordv[0]); wordfree(&full_path); @@ -1179,7 +1173,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { int32_t ret = tscCreateDataBlock(PATH_MAX, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pDataBlock); if (ret != TSDB_CODE_SUCCESS) { - goto _error_clean; + goto _error; } tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock); @@ -1190,7 +1184,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { SSchema * pSchema = tscGetTableSchema(pTableMeta); if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { - goto _error_clean; + goto _error; } SParsedDataColInfo spd = {0}; @@ -1226,7 +1220,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { if (spd.hasVal[t] == true) { code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z); - goto _error_clean; + goto _error; } spd.hasVal[t] = true; @@ -1237,13 +1231,13 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { if (!findColumnIndex) { code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z); - goto _error_clean; + goto _error; } } if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > tinfo.numOfColumns) { code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z); - goto _error_clean; + goto _error; } index = 0; @@ -1252,16 +1246,16 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { if (sToken.type != TK_VALUES) { code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z); - goto _error_clean; + goto _error; } code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum); if (code != TSDB_CODE_SUCCESS) { - goto _error_clean; + goto _error; } } else { code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z); - goto _error_clean; + goto _error; } } @@ -1272,7 +1266,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgId if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { - goto _error_clean; + goto _error; } } else { pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); @@ -1281,7 +1275,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { code = TSDB_CODE_SUCCESS; goto _clean; -_error_clean: +_error: pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); _clean: @@ -1294,7 +1288,7 @@ _clean: return code; } -int tsParseInsertSql(SSqlObj *pSql) { +int tsInsertInitialCheck(SSqlObj *pSql) { if (!pSql->pTscObj->writeAuth) { return TSDB_CODE_TSC_NO_WRITE_AUTH; } @@ -1312,30 +1306,23 @@ int tsParseInsertSql(SSqlObj *pSql) { SQueryInfo *pQueryInfo = NULL; tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo); - TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT); - TSDB_QUERY_SET_TYPE(pQueryInfo->type, pCmd->insertType); + TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType); sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL); if (sToken.type != TK_INTO) { return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z); } - return doParseInsertSql(pSql, pSql->sqlstr + index); + pCmd->curSql = sToken.z + sToken.n; + return TSDB_CODE_SUCCESS; } int tsParseSql(SSqlObj *pSql, bool initialParse) { int32_t ret = TSDB_CODE_SUCCESS; - - if (initialParse) { - assert(!pSql->cmd.parseFinished); + SSqlCmd* pCmd = &pSql->cmd; - char* p = pSql->sqlstr; - pSql->sqlstr = NULL; - - tscPartiallyFreeSqlObj(pSql); - pSql->sqlstr = p; - } else if (!pSql->cmd.parseFinished) { - tscTrace("continue parse sql: %s", pSql->cmd.curSql); + if (!pCmd->parseFinished) { + tscTrace("%p resume to parse sql: %s", pSql, pCmd->curSql); } if (tscIsInsertData(pSql->sqlstr)) { @@ -1347,7 +1334,11 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) { pSql->fetchFp = pSql->fp; pSql->fp = (void(*)())tscHandleMultivnodeInsert; } - + + if (initialParse && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) { + return ret; + } + ret = tsParseInsertSql(pSql); } else { ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE); @@ -1362,11 +1353,9 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) { /* * the pRes->code may be modified or released by another thread in tscTableMetaCallBack function, - * so do NOT use pRes->code to determine if the getTableMeta/getMetricMeta function - * invokes new threads to get data from mnode or simply retrieves data from cache. - * - * do NOT assign return code to pRes->code for the same reason since it may be released by another thread - * pRes->code = ret; + * so do NOT use pRes->code to determine if the getTableMeta function + * invokes new threads to get data from mgmt node or simply retrieves data from cache. + * do NOT assign return code to pRes->code for the same reason since it may be released by another thread already. */ return ret; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 2566e171f4..87af5fb5f7 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1690,7 +1690,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr return invalidSqlErrMsg(pQueryInfo->msg, msg3); } - if (pItem->pNode->pParam->nExpr > 1 && strlen(pItem->aliasName) > 0) { + if (pItem->pNode->pParam->nExpr > 1 && (pItem->aliasName != NULL && strlen(pItem->aliasName) > 0)) { return invalidSqlErrMsg(pQueryInfo->msg, msg8); } @@ -1761,7 +1761,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr int32_t numOfFields = 0; // multicolumn selection does not support alias name - if (strlen(pItem->aliasName) != 0) { + if (pItem->aliasName != NULL && strlen(pItem->aliasName) > 0) { return invalidSqlErrMsg(pQueryInfo->msg, msg8); } @@ -2642,7 +2642,7 @@ static int32_t doExtractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnFilterIn tVariantDump(&pRight->val, (char*)pColumnFilter->pz, colType, false); - size_t len = wcslen((wchar_t*)pColumnFilter->pz); + size_t len = twcslen((wchar_t*)pColumnFilter->pz); pColumnFilter->len = len * TSDB_NCHAR_SIZE; } else { tVariantDump(&pRight->val, (char*)&pColumnFilter->lowerBndd, colType, false); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 0fa841bc7c..47a602a6fb 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -488,7 +488,7 @@ static bool tscFreeQhandleInVnode(SSqlObj* pSql) { (pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; - tscTrace("%p start to send msg to free qhandle in dnode, command:%s", pSql, sqlCmd[pCmd->command]); + tscTrace("%p send msg to dnode to free qhandle ASAP, command:%s", pSql, sqlCmd[pCmd->command]); pSql->freed = 1; tscProcessSql(pSql); @@ -510,18 +510,17 @@ static bool tscFreeQhandleInVnode(SSqlObj* pSql) { void taos_free_result(TAOS_RES *res) { SSqlObj *pSql = (SSqlObj *)res; - tscTrace("%p start to free result", res); - + if (pSql == NULL || pSql->signature != pSql) { - tscTrace("%p result has been freed", pSql); + tscTrace("%p sqlObj has been freed", pSql); return; } // The semaphore can not be changed while freeing async sub query objects. SSqlRes *pRes = &pSql->res; if (pRes == NULL || pRes->qhandle == 0) { - tscTrace("%p SqlObj is freed by app, qhandle is null", pSql); tscFreeSqlObj(pSql); + tscTrace("%p SqlObj is freed by app, qhandle is null", pSql); return; } @@ -529,6 +528,7 @@ void taos_free_result(TAOS_RES *res) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); if (pQueryInfo == NULL) { tscFreeSqlObj(pSql); + tscTrace("%p SqlObj is freed by app", pSql); return; } @@ -713,7 +713,6 @@ int taos_validate_sql(TAOS *taos, const char *sql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - pRes->numOfRows = 1; pRes->numOfTotal = 0; pRes->numOfClauseTotal = 0; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 6fc934b6c0..86a41b7ba4 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -503,8 +503,10 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p } strtolower(pSql->sqlstr, sqlstr); + tscDump("%p SQL: %s", pSql, pSql->sqlstr); tsem_init(&pSql->rspSem, 0, 0); - int32_t code = doAsyncParseSql(pSql); + + int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { sem_wait(&pSql->rspSem); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index aaa5ab291f..6c6284b4a6 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1165,8 +1165,8 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); pSql->res.qhandle = 0x1; - pSql->res.numOfRows = 0; - + assert(pSql->res.numOfRows == 0); + if (pSql->pSubs == NULL) { pSql->pSubs = calloc(pSupporter->pState->numOfTotal, POINTER_BYTES); if (pSql->pSubs == NULL) { @@ -1364,7 +1364,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tOrderDescriptor *pDesc = NULL; SColumnModel * pModel = NULL; - pRes->qhandle = 1; // hack the qhandle check + pRes->qhandle = 0x1; // hack the qhandle check const uint32_t nBufferSize = (1u << 16); // 64KB @@ -1845,34 +1845,41 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { } } -static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) { +static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { SInsertSupporter *pSupporter = (SInsertSupporter *)param; SSqlObj* pParentObj = pSupporter->pSql; SSqlCmd* pParentCmd = &pParentObj->cmd; SSubqueryState* pState = pSupporter->pState; - // increase the total inserted rows - if (numOfRows > 0) { - pParentObj->res.numOfRows += numOfRows; - } else { + // record the total inserted rows + if (numOfRows > 0 && tres != pParentObj) { + pParentObj->res.numOfRows += numOfRows; + } + + if (taos_errno(tres) != 0) { SSqlObj* pSql = (SSqlObj*) tres; assert(pSql != NULL && pSql->res.code == numOfRows); pParentObj->res.code = pSql->res.code; } - - taos_free_result(tres); + + // it is not the initial sqlObj, free it + if (tres != pParentObj) { + taos_free_result(tres); + } else { + assert(pParentObj->pSubs[0] == tres); + } + + tfree(pSupporter); if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { return; } tscTrace("%p Async insertion completed, total inserted:%" PRId64, pParentObj, pParentObj->res.numOfRows); - - tfree(pState); - tfree(pSupporter); - + // release data block data + tfree(pState); pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks); // restore user defined fp @@ -1886,9 +1893,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - pRes->qhandle = 1; // hack the qhandle check SDataBlockList *pDataBlocks = pCmd->pDataBlocks; - pSql->pSubs = calloc(pDataBlocks->nSize, POINTER_BYTES); pSql->numOfSubs = pDataBlocks->nSize; assert(pDataBlocks->nSize > 0); @@ -1896,52 +1901,84 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { tscTrace("%p submit data to %d vnode(s)", pSql, pDataBlocks->nSize); SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); pState->numOfTotal = pSql->numOfSubs; - pState->numOfRemain = pState->numOfTotal; - + pState->numOfRemain = pSql->numOfSubs; + pRes->code = TSDB_CODE_SUCCESS; - - int32_t i = 0; - for (; i < pSql->numOfSubs; ++i) { - SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter)); - pSupporter->pSql = pSql; - pSupporter->pState = pState; + + SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter)); + pSupporter->pSql = pSql; + pSupporter->pState = pState; + + pSql->fp = multiVnodeInsertFinalize; + pSql->param = pSupporter; + pSql->pSubs[0] = pSql; // the first sub insert points back to itself + tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pSql, 0); + + int32_t numOfSub = 1; + int32_t code = tscCopyDataBlockToPayload(pSql, pDataBlocks->pData[0]); + if (code != TSDB_CODE_SUCCESS) { + tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, 0, + pDataBlocks->nSize, code); + goto _error; + } + + for (; numOfSub < pSql->numOfSubs; ++numOfSub) { + SInsertSupporter* pSupporter1 = calloc(1, sizeof(SInsertSupporter)); + pSupporter1->pSql = pSql; + pSupporter1->pState = pState; - SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertMerge, pSupporter, TSDB_SQL_INSERT, NULL); + SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertFinalize, pSupporter1, TSDB_SQL_INSERT, NULL); if (pNew == NULL) { - tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); - break; + tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, numOfSub, strerror(errno)); + goto _error; } /* * assign the callback function to fetchFp to make sure that the error process function can restore - * the callback function (multiVnodeInsertMerge) correctly. + * the callback function (multiVnodeInsertFinalize) correctly. */ pNew->fetchFp = pNew->fp; - pSql->pSubs[i] = pNew; + pSql->pSubs[numOfSub] = pNew; - tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, i); + code = tscCopyDataBlockToPayload(pNew, pDataBlocks->pData[numOfSub]); + if (code != TSDB_CODE_SUCCESS) { + tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, numOfSub, + pDataBlocks->nSize, code); + goto _error; + } else { + tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, numOfSub); + } } - if (i < pSql->numOfSubs) { + if (numOfSub < pSql->numOfSubs) { tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; return pRes->code; // free all allocated resource } - - for (int32_t j = 0; j < pSql->numOfSubs; ++j) { + + // use the local variable + for (int32_t j = 0; j < numOfSub; ++j) { SSqlObj *pSub = pSql->pSubs[j]; - int32_t code = tscCopyDataBlockToPayload(pSub, pDataBlocks->pData[j]); - - if (code != TSDB_CODE_SUCCESS) { - tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, j, - pDataBlocks->nSize, code); - } - tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j); tscProcessSql(pSub); } return TSDB_CODE_SUCCESS; + + _error: + // restore the udf fp + pSql->fp = pSql->fetchFp; + pSql->pSubs[0] = NULL; + + tfree(pState); + tfree(pSql->param); + + for(int32_t j = 1; j < numOfSub; ++j) { + tfree(pSql->pSubs[j]->param); + taos_free_result(pSql->pSubs[j]); + } + + return TSDB_CODE_TSC_OUT_OF_MEMORY; } void tscBuildResFromSubqueries(SSqlObj *pSql) { diff --git a/src/query/src/tvariant.c b/src/query/src/tvariant.c index 1650db6cfd..1b64a7aefa 100644 --- a/src/query/src/tvariant.c +++ b/src/query/src/tvariant.c @@ -184,7 +184,7 @@ int32_t tVariantToString(tVariant *pVar, char *dst) { case TSDB_DATA_TYPE_NCHAR: { dst[0] = '\''; - taosUcs4ToMbs(pVar->wpz, (wcslen(pVar->wpz) + 1) * TSDB_NCHAR_SIZE, dst + 1); + taosUcs4ToMbs(pVar->wpz, (twcslen(pVar->wpz) + 1) * TSDB_NCHAR_SIZE, dst + 1); int32_t len = strlen(dst); dst[len] = '\''; dst[len + 1] = 0; @@ -416,7 +416,7 @@ static int32_t toNchar(tVariant *pVariant, char **pDest, int32_t *pDestSize) { } pVariant->wpz = pWStr; - *pDestSize = wcslen(pVariant->wpz); + *pDestSize = twcslen(pVariant->wpz); // shrink the allocate memory, no need to check here. char* tmp = realloc(pVariant->wpz, (*pDestSize + 1)*TSDB_NCHAR_SIZE); diff --git a/src/query/tests/unitTest.cpp b/src/query/tests/unitTest.cpp index c5b1cbf858..59fd326ef4 100644 --- a/src/query/tests/unitTest.cpp +++ b/src/query/tests/unitTest.cpp @@ -66,7 +66,7 @@ static void _init_tvariant_nchar(tVariant* t) { t->wpz = (wchar_t*)calloc(1, 20 * TSDB_NCHAR_SIZE); t->nType = TSDB_DATA_TYPE_NCHAR; wcscpy(t->wpz, L"-2000000.8765"); - t->nLen = wcslen(t->wpz); + t->nLen = twcslen(t->wpz); } int main(int argc, char** argv) { diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h index e3e52d46ff..a314f0e31d 100644 --- a/src/util/inc/tutil.h +++ b/src/util/inc/tutil.h @@ -119,6 +119,8 @@ extern "C" { uint32_t taosRand(void); +size_t twcslen(const wchar_t *wcs); + int32_t strdequote(char *src); size_t strtrim(char *src); diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index 150767643f..aa5bfe322a 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -57,6 +57,27 @@ uint32_t taosRand(void) } #endif +size_t twcslen(const wchar_t *wcs) { +#ifdef WINDOWS + int *wstr = (int *)wcs; + if (NULL == wstr) { + return 0; + } + + size_t n = 0; + while (1) { + if (0 == *wstr++) { + break; + } + n++; + } + + return n; +#else + return wcslen(wcs); +#endif +} + int32_t strdequote(char *z) { if (z == NULL) { return 0; diff --git a/tests/pytest/alter/alter_table_crash.py b/tests/pytest/alter/alter_table_crash.py new file mode 100644 index 0000000000..903bb60e6a --- /dev/null +++ b/tests/pytest/alter/alter_table_crash.py @@ -0,0 +1,83 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + + def run(self): + tdSql.prepare() + + print("==============Case 1: add column, restart taosd, drop the same colum then add it back") + tdSql.execute( + "create table st(ts timestamp, speed int) tags(loc nchar(20))") + tdSql.execute( + "insert into t1 using st tags('beijing') values(now, 1)") + tdSql.execute( + "alter table st add column tbcol binary(20)") + + # restart taosd + tdDnodes.forcestop(1) + tdDnodes.start(1) + + tdSql.execute( + "alter table st drop column tbcol") + tdSql.execute( + "alter table st add column tbcol binary(20)") + + tdSql.query("select * from st") + tdSql.checkRows(1) + + + print("==============Case 2: keep adding columns, restart taosd") + tdSql.execute( + "create table dt(ts timestamp, tbcol1 tinyint) tags(tgcol1 tinyint)") + tdSql.execute( + "alter table dt add column tbcol2 int") + tdSql.execute( + "alter table dt add column tbcol3 smallint") + tdSql.execute( + "alter table dt add column tbcol4 bigint") + tdSql.execute( + "alter table dt add column tbcol5 float") + tdSql.execute( + "alter table dt add column tbcol6 double") + tdSql.execute( + "alter table dt add column tbcol7 bool") + tdSql.execute( + "alter table dt add column tbcol8 nchar(20)") + tdSql.execute( + "alter table dt add column tbcol9 binary(20)") + + # restart taosd + tdDnodes.forcestop(1) + tdDnodes.start(1) + + tdSql.query("select * from st") + tdSql.checkRows(0) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index dfa3b55048..0bacd63ec1 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -126,6 +126,9 @@ python3 ./test.py -f import_merge/importInsertThenImport.py python3 ./test.py -f user/user_create.py python3 ./test.py -f user/pass_len.py +# stable +python3 ./test.py -f stable/query_after_reset.py + # table python3 ./test.py -f table/del_stable.py @@ -138,7 +141,11 @@ python3 ./test.py -f query/filterAllIntTypes.py python3 ./test.py -f query/filterFloatAndDouble.py python3 ./test.py -f query/filterOtherTypes.py python3 ./test.py -f query/querySort.py +python3 ./test.py -f query/queryJoin.py #stream python3 ./test.py -f stream/stream1.py python3 ./test.py -f stream/stream2.py + +#alter table +python3 ./test.py -f alter/alter_table_crash.py \ No newline at end of file diff --git a/tests/pytest/stable/query_after_reset.py b/tests/pytest/stable/query_after_reset.py new file mode 100644 index 0000000000..2bc171ae5d --- /dev/null +++ b/tests/pytest/stable/query_after_reset.py @@ -0,0 +1,177 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import random +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import * + + +class Test: + def __init__(self): + self.current_tb = "" + self.last_tb = "" + self.written = 0 + + def create_table(self): + tdLog.info("create a table") + self.current_tb = "tb%d" % int(round(time.time() * 1000)) + tdLog.info("current table %s" % self.current_tb) + + if (self.current_tb == self.last_tb): + return + else: + tdSql.execute( + 'create table %s (ts timestamp, speed int)' % + self.current_tb) + self.last_tb = self.current_tb + self.written = 0 + + def insert_data(self): + tdLog.info("will insert data to table") + if (self.current_tb == ""): + tdLog.info("no table, create first") + self.create_table() + + tdLog.info("insert data to table") + insertRows = 10 + tdLog.info("insert %d rows to %s" % (insertRows, self.last_tb)) + for i in range(0, insertRows): + ret = tdSql.execute( + 'insert into %s values (now + %dm, %d)' % + (self.last_tb, i, i)) + self.written = self.written + 1 + + tdLog.info("insert earlier data") + tdSql.execute('insert into %s values (now - 5m , 10)' % self.last_tb) + self.written = self.written + 1 + tdSql.execute('insert into %s values (now - 6m , 10)' % self.last_tb) + self.written = self.written + 1 + tdSql.execute('insert into %s values (now - 7m , 10)' % self.last_tb) + self.written = self.written + 1 + tdSql.execute('insert into %s values (now - 8m , 10)' % self.last_tb) + self.written = self.written + 1 + + def query_data(self): + if (self.written > 0): + tdLog.info("query data from table") + tdSql.query("select * from %s" % self.last_tb) + tdSql.checkRows(self.written) + + def query_stable(self): + tdLog.info("query super table") + tdSql.query("select * from st") + tdSql.checkRows(1) + + def create_stable(self): + tdLog.info("create a super table and sub-table and insert data") + tdSql.execute( + "create table if not exists st (ts timestamp, tagtype int) tags(dev nchar(50))") + tdSql.execute( + 'CREATE TABLE if not exists dev_001 using st tags("dev_01")') + tdSql.execute( + "INSERT INTO dev_001(ts, tagtype) VALUES('2020-05-13 10:00:00.000', 1)") + + def stop_database(self): + tdLog.info("stop databae") + tdDnodes.stop(1) + + def restart_database(self): + tdLog.info("restart databae") + tdDnodes.stop(1) + tdDnodes.start(1) + tdLog.sleep(5) + + def force_restart(self): + tdLog.info("force restart database") + tdDnodes.forcestop(1) + tdDnodes.start(1) + tdLog.sleep(5) + + def drop_table(self): + if (self.current_tb != ""): + tdLog.info("drop current tb %s" % self.current_tb) + tdSql.execute("drop table %s" % self.current_tb) + self.current_tb = "" + self.last_tb = "" + self.written = 0 + + def reset_query_cache(self): + tdLog.info("reset query cache") + tdSql.execute("reset query cache") + tdLog.sleep(1) + + def reset_database(self): + tdLog.info("reset database") + tdDnodes.forcestop(1) + tdDnodes.deploy(1) + self.current_tb = "" + self.last_tb = "" + self.written = 0 + tdDnodes.start(1) + tdSql.prepare() + + def delete_datafiles(self): + tdLog.info("delete data files") + dnodesDir = tdDnodes.getDnodesRootDir() + dataDir = dnodesDir + '/dnode1/*' + deleteCmd = 'rm -rf %s' % dataDir + os.system(deleteCmd) + + self.current_tb = "" + self.last_tb = "" + self.written = 0 + tdDnodes.start(1) + tdSql.prepare() + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def run(self): + tdSql.prepare() + + test = Test() + + switch = { + 1: test.create_table, + 2: test.insert_data, + 3: test.query_data, + 4: test.create_stable, + 5: test.restart_database, + 6: test.force_restart, + 7: test.drop_table, + 8: test.reset_query_cache, + 9: test.reset_database, + 10: test.delete_datafiles, + 11: test.query_stable, + 12: test.stop_database, + } + + switch.get(4, lambda: "ERROR")() + switch.get(12, lambda: "ERROR")() + switch.get(10, lambda: "ERROR")() + switch.get(5, lambda: "ERROR")() + switch.get(11, lambda: "ERROR")() + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase())