fix multi nchar writing

This commit is contained in:
Minglei Jin 2024-08-20 13:55:02 +08:00
parent bf4f0bc377
commit b9303b0ed4
2 changed files with 142 additions and 78 deletions

View File

@ -1915,25 +1915,23 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col
int32_t code = 0;
for (int i = 0; i < bindv->count; ++i) {
char *tbname = bindv->tbnames[i];
TAOS_STMT2_BIND *tags = bindv->tags[i];
TAOS_STMT2_BIND *bind = bindv->bind_cols[i];
if (tbname) {
code = stmtSetTbName2(stmt, tbname);
if (bindv->tbnames && bindv->tbnames[i]) {
code = stmtSetTbName2(stmt, bindv->tbnames[i]);
if (code) {
return code;
}
}
if (tags) {
code = stmtSetTbTags2(stmt, tags);
if (bindv->tags && bindv->tags[i]) {
code = stmtSetTbTags2(stmt, bindv->tags[i]);
if (code) {
return code;
}
}
if (bind) {
if (bindv->bind_cols && bindv->bind_cols[i]) {
TAOS_STMT2_BIND *bind = bindv->bind_cols[i];
if (bind->num <= 0 || bind->num > INT16_MAX) {
tscError("invalid bind num %d", bind->num);
terrno = TSDB_CODE_INVALID_PARA;

View File

@ -587,6 +587,141 @@ end:
return code;
}
static int32_t convertStmtStbNcharCol2(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_STMT2_BIND* src, TAOS_STMT2_BIND* dst) {
int32_t output = 0;
int32_t newBuflen = (pSchema->bytes - VARSTR_HEADER_SIZE) * src->num;
// if (dst->buffer_length < newBuflen) {
// dst->buffer = taosMemoryRealloc(dst->buffer, newBuflen);
dst->buffer = taosMemoryCalloc(1, newBuflen);
if (NULL == dst->buffer) {
return TSDB_CODE_OUT_OF_MEMORY;
}
//}
// if (NULL == dst->length) {
// dst->length = taosMemoryRealloc(dst->length, sizeof(int32_t) * src->num);
dst->length = taosMemoryCalloc(1, sizeof(int32_t) * src->num);
if (NULL == dst->length) {
taosMemoryFreeClear(dst->buffer);
return TSDB_CODE_OUT_OF_MEMORY;
}
//}
dst->buffer_length = pSchema->bytes - VARSTR_HEADER_SIZE;
for (int32_t i = 0; i < src->num; ++i) {
if (src->is_null && src->is_null[i]) {
continue;
}
if (!taosMbsToUcs4(((char*)src->buffer) + src->buffer_length * i, src->length[i],
(TdUcs4*)(((char*)dst->buffer) + dst->buffer_length * i), dst->buffer_length, &output)) {
if (errno == E2BIG) {
return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
}
char buf[512] = {0};
snprintf(buf, tListLen(buf), "%s", strerror(errno));
return buildSyntaxErrMsg(pMsgBuf, buf, NULL);
}
dst->length[i] = output;
}
dst->buffer_type = src->buffer_type;
dst->is_null = src->is_null;
dst->num = src->num;
return TSDB_CODE_SUCCESS;
}
int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
STSchema** pTSchema, SBindInfo2* pBindInfos) {
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
int32_t rowNum = bind->num;
SArray* ncharBinds = NULL;
TAOS_STMT2_BIND ncharBind = {0};
TAOS_STMT2_BIND* pBind = NULL;
int32_t code = 0;
int16_t lastColId = -1;
bool colInOrder = true;
if (NULL == *pTSchema) {
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
}
for (int c = 0; c < boundInfo->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
if (pColSchema->colId <= lastColId) {
colInOrder = false;
} else {
lastColId = pColSchema->colId;
}
// SColData* pCol = taosArrayGet(pCols, c);
if (bind[c].num != rowNum) {
code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
goto _return;
}
if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) &&
bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type
code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
goto _return;
}
if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
code = convertStmtStbNcharCol2(&pBuf, pColSchema, bind + c, &ncharBind);
if (code) {
goto _return;
}
if (!ncharBinds) {
ncharBinds = taosArrayInit(1, sizeof(ncharBind));
if (!ncharBinds) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}
}
if (!taosArrayPush(ncharBinds, &ncharBind)) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}
// pBind = &ncharBind;
pBind = taosArrayGetLast(ncharBinds);
} else {
pBind = bind + c;
}
pBindInfos[c].columnId = pColSchema->colId;
pBindInfos[c].bind = pBind;
pBindInfos[c].type = pColSchema->type;
// code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes -
// VARSTR_HEADER_SIZE: -1); if (code) {
// goto _return;
// }
}
code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols);
qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
_return:
for (int i = 0; i < TARRAY_SIZE(ncharBinds); ++i) {
TAOS_STMT2_BIND* ncharBind = TARRAY_DATA(ncharBinds);
taosMemoryFree(ncharBind[i].buffer);
taosMemoryFree(ncharBind[i].length);
}
taosArrayDestroy(ncharBinds);
// taosMemoryFree(ncharBind.buffer);
// taosMemoryFree(ncharBind.length);
return code;
}
static int32_t convertStmtNcharCol2(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_STMT2_BIND* src, TAOS_STMT2_BIND* dst) {
int32_t output = 0;
int32_t newBuflen = (pSchema->bytes - VARSTR_HEADER_SIZE) * src->num;
@ -632,75 +767,6 @@ static int32_t convertStmtNcharCol2(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_STM
return TSDB_CODE_SUCCESS;
}
int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
STSchema** pTSchema, SBindInfo2* pBindInfos) {
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
int32_t rowNum = bind->num;
TAOS_STMT2_BIND ncharBind = {0};
TAOS_STMT2_BIND* pBind = NULL;
int32_t code = 0;
int16_t lastColId = -1;
bool colInOrder = true;
if (NULL == *pTSchema) {
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
}
for (int c = 0; c < boundInfo->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
if (pColSchema->colId <= lastColId) {
colInOrder = false;
} else {
lastColId = pColSchema->colId;
}
// SColData* pCol = taosArrayGet(pCols, c);
if (bind[c].num != rowNum) {
code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
goto _return;
}
if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) &&
bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type
code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
goto _return;
}
if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
code = convertStmtNcharCol2(&pBuf, pColSchema, bind + c, &ncharBind);
if (code) {
goto _return;
}
pBind = &ncharBind;
} else {
pBind = bind + c;
}
pBindInfos[c].columnId = pColSchema->colId;
pBindInfos[c].bind = pBind;
pBindInfos[c].type = pColSchema->type;
// code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes -
// VARSTR_HEADER_SIZE: -1); if (code) {
// goto _return;
// }
}
code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols);
qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
_return:
taosMemoryFree(ncharBind.buffer);
taosMemoryFree(ncharBind.length);
return code;
}
int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen) {
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);