diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 27da0c13d2..95f522f504 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -156,8 +156,8 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, void* charsetCxt); int32_t qStmtBindParams2(SQuery* pQuery, TAOS_STMT2_BIND* pParams, int32_t colIdx, void* charsetCxt); -// int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, -// STSchema** pTSchema, SBindInfo2* pBindInfos, void *charsetCxt); +int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, + STSchema** pTSchema, SBindInfo2* pBindInfos, void *charsetCxt); int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, void *charsetCxt); int32_t qBindStmtSingleColValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, int32_t rowNum, void *charsetCxt); diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 1d9acc6982..820a7a0110 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -58,13 +58,13 @@ static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) { return true; } -// static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) { -// pStmt->queue.tail->next = param; -// pStmt->queue.tail = param; +static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) { + pStmt->queue.tail->next = param; + pStmt->queue.tail = param; -// pStmt->stat.bindDataNum++; -// (void)atomic_add_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); -// } + pStmt->stat.bindDataNum++; + (void)atomic_add_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); +} static int32_t stmtCreateRequest(STscStmt2* pStmt) { int32_t code = 0; @@ -76,7 +76,7 @@ static int32_t stmtCreateRequest(STscStmt2* pStmt) { pStmt->reqid++; } if (pStmt->db != NULL) { - taosMemoryFreeClear(pStmt->exec.pRequest->pDb); + taosMemoryFreeClear(pStmt->exec.pRequest->pDb); pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db); } if (TSDB_CODE_SUCCESS == code) { @@ -284,17 +284,16 @@ static int32_t stmtParseSql(STscStmt2* pStmt) { STableDataCxt* pTableCtx = *pSrc; if (pStmt->sql.stbInterlaceMode) { - return TSDB_CODE_TSC_STMT_API_ERROR; - // int16_t lastIdx = -1; + int16_t lastIdx = -1; - // for (int32_t i = 0; i < pTableCtx->boundColsInfo.numOfBound; ++i) { - // if (pTableCtx->boundColsInfo.pColIndex[i] < lastIdx) { - // pStmt->sql.stbInterlaceMode = false; - // break; - // } + for (int32_t i = 0; i < pTableCtx->boundColsInfo.numOfBound; ++i) { + if (pTableCtx->boundColsInfo.pColIndex[i] < lastIdx) { + pStmt->sql.stbInterlaceMode = false; + break; + } - // lastIdx = pTableCtx->boundColsInfo.pColIndex[i]; - // } + lastIdx = pTableCtx->boundColsInfo.pColIndex[i]; + } } if (NULL == pStmt->sql.pBindInfo) { @@ -326,10 +325,10 @@ static int32_t stmtCleanBindInfo(STscStmt2* pStmt) { return TSDB_CODE_SUCCESS; } -// static void stmtFreeTableBlkList(STableColsData* pTb) { -// (void)qResetStmtColumns(pTb->aCol, true); -// taosArrayDestroy(pTb->aCol); -// } +static void stmtFreeTableBlkList(STableColsData* pTb) { + (void)qResetStmtColumns(pTb->aCol, true); + taosArrayDestroy(pTb->aCol); +} static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) { pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0); @@ -498,10 +497,9 @@ static int32_t stmtRebuildDataBlock(STscStmt2* pStmt, STableDataCxt* pDataBlock, static int32_t stmtGetFromCache(STscStmt2* pStmt) { if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx) { - return TSDB_CODE_TSC_STMT_API_ERROR; - // pStmt->bInfo.needParse = false; - // pStmt->bInfo.inExecCache = false; - // return TSDB_CODE_SUCCESS; + pStmt->bInfo.needParse = false; + pStmt->bInfo.inExecCache = false; + return TSDB_CODE_SUCCESS; } pStmt->bInfo.needParse = true; @@ -897,32 +895,32 @@ int stmtPrepare2(TAOS_STMT2* stmt, const char* sql, unsigned long length) { return TSDB_CODE_SUCCESS; } -// static int32_t stmtInitStbInterlaceTableInfo(STscStmt2* pStmt) { -// STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); -// if (!pSrc) { -// return terrno; -// } -// STableDataCxt* pDst = NULL; +static int32_t stmtInitStbInterlaceTableInfo(STscStmt2* pStmt) { + STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); + if (!pSrc) { + return terrno; + } + STableDataCxt* pDst = NULL; -// STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true)); -// pStmt->sql.siInfo.pDataCtx = pDst; + STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true)); + pStmt->sql.siInfo.pDataCtx = pDst; -// SArray* pTblCols = NULL; -// for (int32_t i = 0; i < STMT_TABLE_COLS_NUM; i++) { -// pTblCols = taosArrayInit(20, POINTER_BYTES); -// if (NULL == pTblCols) { -// return terrno; -// } + SArray* pTblCols = NULL; + for (int32_t i = 0; i < STMT_TABLE_COLS_NUM; i++) { + pTblCols = taosArrayInit(20, POINTER_BYTES); + if (NULL == pTblCols) { + return terrno; + } -// if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) { -// return terrno; -// } -// } + if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) { + return terrno; + } + } -// pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags; + pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags; -// return TSDB_CODE_SUCCESS; -// } + return TSDB_CODE_SUCCESS; +} int stmtIsInsert2(TAOS_STMT2* stmt, int* insert) { STscStmt2* pStmt = (STscStmt2*)stmt; @@ -981,8 +979,7 @@ int stmtSetTbName2(TAOS_STMT2* stmt, const char* tbName) { } if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) { - return TSDB_CODE_TSC_STMT_API_ERROR; - // STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt)); + STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt)); } int64_t startUs2 = taosGetTimestampUs(); @@ -1012,8 +1009,7 @@ int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags) { STMT_ERR_RET(stmtParseSql(pStmt)); } if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) { - return TSDB_CODE_TSC_STMT_API_ERROR; - // STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt)); + STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt)); } SBoundColInfo* tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags; @@ -1041,33 +1037,33 @@ int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags) { return TSDB_CODE_SUCCESS; } -// static int stmtFetchColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) { -// if (pStmt->errCode != TSDB_CODE_SUCCESS) { -// return pStmt->errCode; -// } +static int stmtFetchColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) { + if (pStmt->errCode != TSDB_CODE_SUCCESS) { + return pStmt->errCode; + } -// if (STMT_TYPE_QUERY == pStmt->sql.type) { -// tscError("invalid operation to get query column fileds"); -// STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); -// } + if (STMT_TYPE_QUERY == pStmt->sql.type) { + tscError("invalid operation to get query column fileds"); + STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); + } -// STableDataCxt** pDataBlock = NULL; + STableDataCxt** pDataBlock = NULL; -// if (pStmt->sql.stbInterlaceMode) { -// pDataBlock = &pStmt->sql.siInfo.pDataCtx; -// } else { -// pDataBlock = -// (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); -// if (NULL == pDataBlock) { -// tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName); -// STMT_ERR_RET(TSDB_CODE_APP_ERROR); -// } -// } + if (pStmt->sql.stbInterlaceMode) { + pDataBlock = &pStmt->sql.siInfo.pDataCtx; + } else { + pDataBlock = + (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); + if (NULL == pDataBlock) { + tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName); + STMT_ERR_RET(TSDB_CODE_APP_ERROR); + } + } -// STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields)); + STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields)); -// return TSDB_CODE_SUCCESS; -// } + return TSDB_CODE_SUCCESS; +} static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_ALL** fields) { if (pStmt->errCode != TSDB_CODE_SUCCESS) { @@ -1082,8 +1078,7 @@ static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIEL STableDataCxt** pDataBlock = NULL; if (pStmt->sql.stbInterlaceMode) { - return TSDB_CODE_TSC_STMT_API_ERROR; - // pDataBlock = &pStmt->sql.siInfo.pDataCtx; + pDataBlock = &pStmt->sql.siInfo.pDataCtx; } else { pDataBlock = (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); @@ -1122,69 +1117,69 @@ SArray* stmtGetFreeCol(STscStmt2* pStmt, int32_t* idx) { } } */ -// static int32_t stmtAppendTablePostHandle(STscStmt2* pStmt, SStmtQNode* param) { -// if (NULL == pStmt->sql.siInfo.pVgroupHash) { -// pStmt->sql.siInfo.pVgroupHash = -// taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); -// } -// if (NULL == pStmt->sql.siInfo.pVgroupList) { -// pStmt->sql.siInfo.pVgroupList = taosArrayInit(64, POINTER_BYTES); -// } +static int32_t stmtAppendTablePostHandle(STscStmt2* pStmt, SStmtQNode* param) { + if (NULL == pStmt->sql.siInfo.pVgroupHash) { + pStmt->sql.siInfo.pVgroupHash = + taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + } + if (NULL == pStmt->sql.siInfo.pVgroupList) { + pStmt->sql.siInfo.pVgroupList = taosArrayInit(64, POINTER_BYTES); + } -// if (NULL == pStmt->sql.siInfo.pRequest) { -// STMT_ERR_RET(buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, -// (SRequestObj**)&pStmt->sql.siInfo.pRequest, pStmt->reqid)); + if (NULL == pStmt->sql.siInfo.pRequest) { + STMT_ERR_RET(buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, + (SRequestObj**)&pStmt->sql.siInfo.pRequest, pStmt->reqid)); -// if (pStmt->reqid != 0) { -// pStmt->reqid++; -// } -// pStmt->exec.pRequest->syncQuery = true; + if (pStmt->reqid != 0) { + pStmt->reqid++; + } + pStmt->exec.pRequest->syncQuery = true; -// pStmt->sql.siInfo.requestId = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->requestId; -// pStmt->sql.siInfo.requestSelf = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->self; -// } + pStmt->sql.siInfo.requestId = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->requestId; + pStmt->sql.siInfo.requestSelf = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->self; + } -// if (!pStmt->sql.siInfo.tbFromHash && pStmt->sql.siInfo.firstName[0] && -// 0 == strcmp(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName)) { -// pStmt->sql.siInfo.tbFromHash = true; -// } + if (!pStmt->sql.siInfo.tbFromHash && pStmt->sql.siInfo.firstName[0] && + 0 == strcmp(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName)) { + pStmt->sql.siInfo.tbFromHash = true; + } -// if (0 == pStmt->sql.siInfo.firstName[0]) { -// tstrncpy(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN); -// } + if (0 == pStmt->sql.siInfo.firstName[0]) { + tstrncpy(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN); + } -// param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash; -// param->next = NULL; + param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash; + param->next = NULL; -// (void)atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1); + (void)atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1); -// stmtEnqueue(pStmt, param); + stmtEnqueue(pStmt, param); -// return TSDB_CODE_SUCCESS; -// } + return TSDB_CODE_SUCCESS; +} -// static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt2* pStmt, SArray** pTableCols) { -// while (true) { -// if (pStmt->sql.siInfo.pTableColsIdx < taosArrayGetSize(pStmt->sql.siInfo.pTableCols)) { -// *pTableCols = (SArray*)taosArrayGetP(pStmt->sql.siInfo.pTableCols, pStmt->sql.siInfo.pTableColsIdx++); -// break; -// } else { -// SArray* pTblCols = NULL; -// for (int32_t i = 0; i < 100; i++) { -// pTblCols = taosArrayInit(20, POINTER_BYTES); -// if (NULL == pTblCols) { -// return terrno; -// } +static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt2* pStmt, SArray** pTableCols) { + while (true) { + if (pStmt->sql.siInfo.pTableColsIdx < taosArrayGetSize(pStmt->sql.siInfo.pTableCols)) { + *pTableCols = (SArray*)taosArrayGetP(pStmt->sql.siInfo.pTableCols, pStmt->sql.siInfo.pTableColsIdx++); + break; + } else { + SArray* pTblCols = NULL; + for (int32_t i = 0; i < 100; i++) { + pTblCols = taosArrayInit(20, POINTER_BYTES); + if (NULL == pTblCols) { + return terrno; + } -// if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) { -// return terrno; -// } -// } -// } -// } + if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) { + return terrno; + } + } + } + } -// return TSDB_CODE_SUCCESS; -// } + return TSDB_CODE_SUCCESS; +} static int32_t stmtCacheBlock(STscStmt2* pStmt) { if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) { @@ -1238,21 +1233,19 @@ static int stmtAddBatch2(TAOS_STMT2* stmt) { STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH)); if (pStmt->sql.stbInterlaceMode) { - return TSDB_CODE_TSC_STMT_API_ERROR; - // todo:add stb interlace mode - // int64_t startUs2 = taosGetTimestampUs(); - // pStmt->stat.addBatchUs += startUs2 - startUs; + int64_t startUs2 = taosGetTimestampUs(); + pStmt->stat.addBatchUs += startUs2 - startUs; - // pStmt->sql.siInfo.tableColsReady = false; + pStmt->sql.siInfo.tableColsReady = false; - // SStmtQNode* param = NULL; - // STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)¶m)); - // param->restoreTbCols = true; - // param->next = NULL; + SStmtQNode* param = NULL; + STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)¶m)); + param->restoreTbCols = true; + param->next = NULL; - // stmtEnqueue(pStmt, param); + stmtEnqueue(pStmt, param); - // return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; } STMT_ERR_RET(stmtCacheBlock(pStmt)); @@ -1373,8 +1366,7 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) { } if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) { - return TSDB_CODE_TSC_STMT_API_ERROR; - // STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt)); + STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt)); } STableDataCxt** pDataBlock = NULL; @@ -1400,15 +1392,14 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) { SStmtQNode* param = NULL; if (pStmt->sql.stbInterlaceMode) { - return TSDB_CODE_TSC_STMT_API_ERROR; - // STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)¶m)); - // STMT_ERR_RET(stmtGetTableColsFromCache(pStmt, ¶m->tblData.aCol)); - // taosArrayClear(param->tblData.aCol); + STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)¶m)); + STMT_ERR_RET(stmtGetTableColsFromCache(pStmt, ¶m->tblData.aCol)); + taosArrayClear(param->tblData.aCol); - // // param->tblData.aCol = taosArrayInit(20, POINTER_BYTES); + // param->tblData.aCol = taosArrayInit(20, POINTER_BYTES); - // param->restoreTbCols = false; - // tstrncpy(param->tblData.tbName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN); + param->restoreTbCols = false; + tstrncpy(param->tblData.tbName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN); } int64_t startUs3 = taosGetTimestampUs(); @@ -1418,15 +1409,12 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) { if (colIdx < 0) { if (pStmt->sql.stbInterlaceMode) { - return TSDB_CODE_TSC_STMT_API_ERROR; - // todo - // (*pDataBlock)->pData->flags = 0; - // code = qBindStmtStbColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, - // pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, - // pStmt->sql.pBindInfo, pStmt->taos->optionInfo.charsetCxt); + (*pDataBlock)->pData->flags = 0; + code = qBindStmtStbColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, + pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo, pStmt->taos->optionInfo.charsetCxt); } else { - code = qBindStmtColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, - pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt); + code = + qBindStmtColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt); } if (code) { @@ -1435,9 +1423,8 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) { } } else { if (pStmt->sql.stbInterlaceMode) { - return TSDB_CODE_TSC_STMT_API_ERROR; - // tscError("bind single column not allowed in stb insert mode"); - // STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); + tscError("bind single column not allowed in stb insert mode"); + STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); } if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) { @@ -1452,8 +1439,7 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) { } code = qBindStmtSingleColValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, - pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum, - pStmt->taos->optionInfo.charsetCxt); + pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum, pStmt->taos->optionInfo.charsetCxt); if (code) { tscError("qBindStmtSingleColValue failed, error:%s", tstrerror(code)); STMT_ERR_RET(code); @@ -1464,9 +1450,7 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) { pStmt->stat.bindDataUs3 += startUs4 - startUs3; if (pStmt->sql.stbInterlaceMode) { - return TSDB_CODE_TSC_STMT_API_ERROR; - // to do - // STMT_ERR_RET(stmtAppendTablePostHandle(pStmt, param)); + STMT_ERR_RET(stmtAppendTablePostHandle(pStmt, param)); } else { STMT_ERR_RET(stmtAddBatch2(pStmt)); } @@ -1674,26 +1658,23 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) { } if (pStmt->sql.stbInterlaceMode) { - return TSDB_CODE_TSC_STMT_API_ERROR; - // STMT_ERR_RET(stmtAddBatch2(pStmt)); + STMT_ERR_RET(stmtAddBatch2(pStmt)); } STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); if (STMT_TYPE_QUERY != pStmt->sql.type) { if (pStmt->sql.stbInterlaceMode) { - return TSDB_CODE_TSC_STMT_API_ERROR; - // todo:stb interlace mode - // int64_t startTs = taosGetTimestampUs(); - // while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) { - // taosUsleep(1); - // } - // pStmt->stat.execWaitUs += taosGetTimestampUs() - startTs; + int64_t startTs = taosGetTimestampUs(); + while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) { + taosUsleep(1); + } + pStmt->stat.execWaitUs += taosGetTimestampUs() - startTs; - // STMT_ERR_RET(qBuildStmtFinOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->sql.siInfo.pVgroupList)); - // taosHashCleanup(pStmt->sql.siInfo.pVgroupHash); - // pStmt->sql.siInfo.pVgroupHash = NULL; - // pStmt->sql.siInfo.pVgroupList = NULL; + STMT_ERR_RET(qBuildStmtFinOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->sql.siInfo.pVgroupList)); + taosHashCleanup(pStmt->sql.siInfo.pVgroupHash); + pStmt->sql.siInfo.pVgroupHash = NULL; + pStmt->sql.siInfo.pVgroupList = NULL; } else { tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE); taosMemoryFreeClear(pStmt->exec.pCurrTbData); @@ -1895,11 +1876,11 @@ int stmtGetParamNum2(TAOS_STMT2* stmt, int* nums) { STMT_ERR_RET(stmtParseSql(pStmt)); } - // if (STMT_TYPE_QUERY == pStmt->sql.type) { - *nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues); - // } else { - // // STMT_ERR_RET(stmtFetchColFields2(stmt, nums, NULL)); - // } + if (STMT_TYPE_QUERY == pStmt->sql.type) { + *nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues); + } else { + STMT_ERR_RET(stmtFetchColFields2(stmt, nums, NULL)); + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index 666892e6f5..74fac463f1 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -636,134 +636,134 @@ end: return code; } -// static int32_t convertStmtStbNcharCol2(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_STMT2_BIND* src, TAOS_STMT2_BIND* dst, void *charsetCxt) { -// int32_t output = 0; -// const int32_t max_buf_len = pSchema->bytes - VARSTR_HEADER_SIZE; +static int32_t convertStmtStbNcharCol2(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_STMT2_BIND* src, TAOS_STMT2_BIND* dst, void *charsetCxt) { + int32_t output = 0; + const int32_t max_buf_len = pSchema->bytes - VARSTR_HEADER_SIZE; -// dst->buffer = taosMemoryCalloc(src->num, max_buf_len); -// if (NULL == dst->buffer) { -// return terrno; -// } + dst->buffer = taosMemoryCalloc(src->num, max_buf_len); + if (NULL == dst->buffer) { + return terrno; + } -// dst->length = taosMemoryCalloc(src->num, sizeof(int32_t)); -// if (NULL == dst->length) { -// taosMemoryFreeClear(dst->buffer); -// return terrno; -// } + dst->length = taosMemoryCalloc(src->num, sizeof(int32_t)); + if (NULL == dst->length) { + taosMemoryFreeClear(dst->buffer); + return terrno; + } -// char* src_buf = src->buffer; -// char* dst_buf = dst->buffer; -// for (int32_t i = 0; i < src->num; ++i) { -// if (src->is_null && src->is_null[i]) { -// continue; -// } + char* src_buf = src->buffer; + char* dst_buf = dst->buffer; + for (int32_t i = 0; i < src->num; ++i) { + if (src->is_null && src->is_null[i]) { + continue; + } -// if (!taosMbsToUcs4(src_buf, src->length[i], (TdUcs4*)dst_buf, max_buf_len, &output, charsetCxt)) { -// if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) { -// return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name); -// } -// char buf[512] = {0}; -// snprintf(buf, tListLen(buf), "%s", strerror(terrno)); -// return buildSyntaxErrMsg(pMsgBuf, buf, NULL); -// } + if (!taosMbsToUcs4(src_buf, src->length[i], (TdUcs4*)dst_buf, max_buf_len, &output, charsetCxt)) { + if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) { + return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name); + } + char buf[512] = {0}; + snprintf(buf, tListLen(buf), "%s", strerror(terrno)); + return buildSyntaxErrMsg(pMsgBuf, buf, NULL); + } -// dst->length[i] = output; -// src_buf += src->length[i]; -// dst_buf += output; -// } + dst->length[i] = output; + src_buf += src->length[i]; + dst_buf += output; + } -// dst->buffer_type = src->buffer_type; -// dst->is_null = src->is_null; -// dst->num = src->num; + dst->buffer_type = src->buffer_type; + dst->is_null = src->is_null; + dst->num = src->num; -// return TSDB_CODE_SUCCESS; -// } + return TSDB_CODE_SUCCESS; +} -// int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, -// STSchema** pTSchema, SBindInfo2* pBindInfos, void *charsetCxt) { -// STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; -// SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta); -// SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo; -// SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; -// int32_t rowNum = bind->num; -// SArray* ncharBinds = NULL; -// TAOS_STMT2_BIND ncharBind = {0}; -// int32_t code = 0; -// int16_t lastColId = -1; -// bool colInOrder = true; -// int ncharColNums = 0; +int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, + STSchema** pTSchema, SBindInfo2* pBindInfos, void *charsetCxt) { + STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; + SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta); + SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo; + SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; + int32_t rowNum = bind->num; + SArray* ncharBinds = NULL; + TAOS_STMT2_BIND ncharBind = {0}; + int32_t code = 0; + int16_t lastColId = -1; + bool colInOrder = true; + int ncharColNums = 0; -// if (NULL == *pTSchema) { -// *pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion); -// } + if (NULL == *pTSchema) { + *pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion); + } -// for (int c = 0; c < boundInfo->numOfBound; ++c) { -// if (TSDB_DATA_TYPE_NCHAR == pSchema[boundInfo->pColIndex[c]].type) { -// ncharColNums++; -// } -// } -// if (ncharColNums > 0) { -// ncharBinds = taosArrayInit(ncharColNums, sizeof(ncharBind)); -// if (!ncharBinds) { -// code = terrno; -// goto _return; -// } -// } + for (int c = 0; c < boundInfo->numOfBound; ++c) { + if (TSDB_DATA_TYPE_NCHAR == pSchema[boundInfo->pColIndex[c]].type) { + ncharColNums++; + } + } + if (ncharColNums > 0) { + ncharBinds = taosArrayInit(ncharColNums, sizeof(ncharBind)); + if (!ncharBinds) { + code = terrno; + goto _return; + } + } -// for (int c = 0; c < boundInfo->numOfBound; ++c) { -// SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]]; -// if (pColSchema->colId <= lastColId) { -// colInOrder = false; -// } else { -// lastColId = pColSchema->colId; -// } + for (int c = 0; c < boundInfo->numOfBound; ++c) { + SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]]; + if (pColSchema->colId <= lastColId) { + colInOrder = false; + } else { + lastColId = pColSchema->colId; + } -// if (bind[c].num != rowNum) { -// code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); -// goto _return; -// } + if (bind[c].num != rowNum) { + code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); + goto _return; + } -// if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) && -// bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type -// code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); -// goto _return; -// } + if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) && + bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type + code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); + goto _return; + } -// if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) { -// code = convertStmtStbNcharCol2(&pBuf, pColSchema, bind + c, &ncharBind, charsetCxt); -// if (code) { -// goto _return; -// } -// if (!taosArrayPush(ncharBinds, &ncharBind)) { -// code = terrno; -// goto _return; -// } -// pBindInfos[c].bind = taosArrayGetLast(ncharBinds); -// } else { -// pBindInfos[c].bind = bind + c; -// } + if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) { + code = convertStmtStbNcharCol2(&pBuf, pColSchema, bind + c, &ncharBind, charsetCxt); + if (code) { + goto _return; + } + if (!taosArrayPush(ncharBinds, &ncharBind)) { + code = terrno; + goto _return; + } + pBindInfos[c].bind = taosArrayGetLast(ncharBinds); + } else { + pBindInfos[c].bind = bind + c; + } -// pBindInfos[c].columnId = pColSchema->colId; -// pBindInfos[c].type = pColSchema->type; -// pBindInfos[c].bytes = pColSchema->bytes; -// } + pBindInfos[c].columnId = pColSchema->colId; + pBindInfos[c].type = pColSchema->type; + pBindInfos[c].bytes = pColSchema->bytes; + } -// code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, &pDataBlock->ordered, &pDataBlock->duplicateTs); + code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, &pDataBlock->ordered, &pDataBlock->duplicateTs); -// qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum); + qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum); -// _return: -// if (ncharBinds) { -// for (int i = 0; i < TARRAY_SIZE(ncharBinds); ++i) { -// TAOS_STMT2_BIND* ncBind = TARRAY_DATA(ncharBinds); -// taosMemoryFree(ncBind[i].buffer); -// taosMemoryFree(ncBind[i].length); -// } -// taosArrayDestroy(ncharBinds); -// } +_return: + if (ncharBinds) { + for (int i = 0; i < TARRAY_SIZE(ncharBinds); ++i) { + TAOS_STMT2_BIND* ncBind = TARRAY_DATA(ncharBinds); + taosMemoryFree(ncBind[i].buffer); + taosMemoryFree(ncBind[i].length); + } + taosArrayDestroy(ncharBinds); + } -// return code; -// } + return code; +} static int32_t convertStmtNcharCol2(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_STMT2_BIND* src, TAOS_STMT2_BIND* dst, void *charsetCxt) { int32_t output = 0;