Merge pull request #18553 from taosdata/fix/TD-20613
fix:[TD-20612] error if write raw with some cols
This commit is contained in:
commit
d20d1e07a7
|
@ -307,6 +307,7 @@ DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
|
||||||
DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw);
|
DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw);
|
||||||
DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw);
|
DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw);
|
||||||
DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char *tbname);
|
DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char *tbname);
|
||||||
|
DLL_EXPORT int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD *fields, int numFields);
|
||||||
DLL_EXPORT void tmq_free_raw(tmq_raw_data raw);
|
DLL_EXPORT void tmq_free_raw(tmq_raw_data raw);
|
||||||
// Returning null means error. Returned result need to be freed by tmq_free_json_meta
|
// Returning null means error. Returned result need to be freed by tmq_free_json_meta
|
||||||
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res);
|
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res);
|
||||||
|
|
|
@ -1645,7 +1645,7 @@ int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
|
||||||
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) {
|
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) {
|
||||||
char* p = (char*)pResultInfo->pData;
|
char* p = (char*)pResultInfo->pData;
|
||||||
|
|
||||||
// version + length + numOfRows + numOfCol + groupId + flag_segment + column_info
|
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
|
||||||
int32_t len = getVersion1BlockMetaSize(p, numOfCols);
|
int32_t len = getVersion1BlockMetaSize(p, numOfCols);
|
||||||
int32_t* colLength = (int32_t*)(p + len);
|
int32_t* colLength = (int32_t*)(p + len);
|
||||||
len += sizeof(int32_t) * numOfCols;
|
len += sizeof(int32_t) * numOfCols;
|
||||||
|
|
|
@ -1211,6 +1211,208 @@ static void destroyVgHash(void* data) {
|
||||||
taosMemoryFreeClear(vgData->data);
|
taosMemoryFreeClear(vgData->data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD *fields, int numFields){
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
STableMeta* pTableMeta = NULL;
|
||||||
|
SQuery* pQuery = NULL;
|
||||||
|
SSubmitReq* subReq = NULL;
|
||||||
|
|
||||||
|
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
|
||||||
|
if (!pRequest) {
|
||||||
|
uError("WriteRaw:createRequest error request is null");
|
||||||
|
code = terrno;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRequest->syncQuery = true;
|
||||||
|
if (!pRequest->pDb) {
|
||||||
|
uError("WriteRaw:not use db");
|
||||||
|
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
|
||||||
|
tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
|
||||||
|
tstrncpy(pName.tname, tbname, sizeof(pName.tname));
|
||||||
|
|
||||||
|
struct SCatalog* pCatalog = NULL;
|
||||||
|
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
uError("WriteRaw: get gatlog error");
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRequestConnInfo conn = {0};
|
||||||
|
conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
|
||||||
|
conn.requestId = pRequest->requestId;
|
||||||
|
conn.requestObjRefId = pRequest->self;
|
||||||
|
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
|
||||||
|
|
||||||
|
SVgroupInfo vgData = {0};
|
||||||
|
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbname);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
|
||||||
|
uint64_t uid = pTableMeta->uid;
|
||||||
|
int32_t numOfCols = pTableMeta->tableInfo.numOfColumns;
|
||||||
|
|
||||||
|
uint16_t fLen = 0;
|
||||||
|
int32_t rowSize = 0;
|
||||||
|
int16_t nVar = 0;
|
||||||
|
for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
|
||||||
|
SSchema* schema = pTableMeta->schema + i;
|
||||||
|
fLen += TYPE_BYTES[schema->type];
|
||||||
|
rowSize += schema->bytes;
|
||||||
|
if (IS_VAR_DATA_TYPE(schema->type)) {
|
||||||
|
nVar++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fLen -= sizeof(TSKEY);
|
||||||
|
|
||||||
|
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
|
||||||
|
(int32_t)TD_BITMAP_BYTES(numOfCols - 1);
|
||||||
|
int32_t schemaLen = 0;
|
||||||
|
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
|
||||||
|
|
||||||
|
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
|
||||||
|
subReq = taosMemoryCalloc(1, totalLen);
|
||||||
|
SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq));
|
||||||
|
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
|
||||||
|
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
|
||||||
|
|
||||||
|
SRowBuilder rb = {0};
|
||||||
|
tdSRowInit(&rb, pTableMeta->sversion);
|
||||||
|
tdSRowSetTpInfo(&rb, numOfCols, fLen);
|
||||||
|
int32_t dataLen = 0;
|
||||||
|
|
||||||
|
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
|
||||||
|
char* pStart = pData + getVersion1BlockMetaSize(pData, numFields);
|
||||||
|
int32_t* colLength = (int32_t*)pStart;
|
||||||
|
pStart += sizeof(int32_t) * numFields;
|
||||||
|
|
||||||
|
SResultColumn* pCol = taosMemoryCalloc(numFields, sizeof(SResultColumn));
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numFields; ++i) {
|
||||||
|
if (IS_VAR_DATA_TYPE(fields[i].type)) {
|
||||||
|
pCol[i].offset = (int32_t*)pStart;
|
||||||
|
pStart += rows * sizeof(int32_t);
|
||||||
|
} else {
|
||||||
|
pCol[i].nullbitmap = pStart;
|
||||||
|
pStart += BitmapLen(rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
pCol[i].pData = pStart;
|
||||||
|
pStart += colLength[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
|
for (int i = 0; i < numFields; i++) {
|
||||||
|
TAOS_FIELD* schema = &fields[i];
|
||||||
|
taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < rows; j++) {
|
||||||
|
tdSRowResetBuf(&rb, rowData);
|
||||||
|
int32_t offset = 0;
|
||||||
|
for (int32_t k = 0; k < numOfCols; k++) {
|
||||||
|
const SSchema* pColumn = &pTableMeta->schema[k];
|
||||||
|
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
|
||||||
|
if (!index) { // add none
|
||||||
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k);
|
||||||
|
}else{
|
||||||
|
if (IS_VAR_DATA_TYPE(pColumn->type)) {
|
||||||
|
if (pCol[*index].offset[j] != -1) {
|
||||||
|
char* data = pCol[*index].pData + pCol[*index].offset[j];
|
||||||
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
|
||||||
|
} else {
|
||||||
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (!colDataIsNull_f(pCol[*index].nullbitmap, j)) {
|
||||||
|
char* data = pCol[*index].pData + pColumn->bytes * j;
|
||||||
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
|
||||||
|
} else {
|
||||||
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
|
offset += TYPE_BYTES[pColumn->type];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tdSRowEnd(&rb);
|
||||||
|
int32_t rowLen = TD_ROW_LEN(rowData);
|
||||||
|
rowData = POINTER_SHIFT(rowData, rowLen);
|
||||||
|
dataLen += rowLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashCleanup(schemaHash);
|
||||||
|
taosMemoryFree(pCol);
|
||||||
|
|
||||||
|
blk->uid = htobe64(uid);
|
||||||
|
blk->suid = htobe64(suid);
|
||||||
|
blk->sversion = htonl(pTableMeta->sversion);
|
||||||
|
blk->schemaLen = htonl(schemaLen);
|
||||||
|
blk->numOfRows = htonl(rows);
|
||||||
|
blk->dataLen = htonl(dataLen);
|
||||||
|
subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen;
|
||||||
|
subReq->numOfBlocks = 1;
|
||||||
|
|
||||||
|
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
|
||||||
|
if (NULL == pQuery) {
|
||||||
|
uError("create SQuery error");
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||||
|
pQuery->haveResultSet = false;
|
||||||
|
pQuery->msgType = TDMT_VND_SUBMIT;
|
||||||
|
pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
||||||
|
if (NULL == pQuery->pRoot) {
|
||||||
|
uError("create pQuery->pRoot error");
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
||||||
|
nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
|
||||||
|
|
||||||
|
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
||||||
|
if (NULL == dst) {
|
||||||
|
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
dst->vg = vgData;
|
||||||
|
dst->numOfTables = subReq->numOfBlocks;
|
||||||
|
dst->size = subReq->length;
|
||||||
|
dst->pData = (char*)subReq;
|
||||||
|
subReq->header.vgId = htonl(dst->vg.vgId);
|
||||||
|
subReq->version = htonl(1);
|
||||||
|
subReq->header.contLen = htonl(subReq->length);
|
||||||
|
subReq->length = htonl(subReq->length);
|
||||||
|
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
|
||||||
|
subReq = NULL; // no need free
|
||||||
|
taosArrayPush(nodeStmt->pDataBlocks, &dst);
|
||||||
|
|
||||||
|
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||||
|
code = pRequest->code;
|
||||||
|
|
||||||
|
end:
|
||||||
|
taosMemoryFreeClear(pTableMeta);
|
||||||
|
qDestroyQuery(pQuery);
|
||||||
|
taosMemoryFree(subReq);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
|
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
STableMeta* pTableMeta = NULL;
|
STableMeta* pTableMeta = NULL;
|
||||||
|
@ -1293,6 +1495,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
|
||||||
tdSRowSetTpInfo(&rb, numOfCols, fLen);
|
tdSRowSetTpInfo(&rb, numOfCols, fLen);
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
|
|
||||||
|
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
|
||||||
char* pStart = pData + getVersion1BlockMetaSize(pData, numOfCols);
|
char* pStart = pData + getVersion1BlockMetaSize(pData, numOfCols);
|
||||||
int32_t* colLength = (int32_t*)pStart;
|
int32_t* colLength = (int32_t*)pStart;
|
||||||
pStart += sizeof(int32_t) * numOfCols;
|
pStart += sizeof(int32_t) * numOfCols;
|
||||||
|
@ -1577,7 +1780,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||||
const SSchema* pColumn = &pTableMeta->schema[k];
|
const SSchema* pColumn = &pTableMeta->schema[k];
|
||||||
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
|
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
|
||||||
if (!index) {
|
if (!index) {
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k);
|
||||||
} else {
|
} else {
|
||||||
char* colData = rspObj.resInfo.row[*index];
|
char* colData = rspObj.resInfo.row[*index];
|
||||||
if (!colData) {
|
if (!colData) {
|
||||||
|
@ -1670,6 +1873,7 @@ end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SHashObj* pVgHash = NULL;
|
SHashObj* pVgHash = NULL;
|
||||||
|
@ -1882,7 +2086,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
const SSchema* pColumn = &pTableMeta->schema[k];
|
const SSchema* pColumn = &pTableMeta->schema[k];
|
||||||
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
|
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
|
||||||
if (!index) {
|
if (!index) {
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k);
|
||||||
} else {
|
} else {
|
||||||
char* colData = rspObj.resInfo.row[*index];
|
char* colData = rspObj.resInfo.row[*index];
|
||||||
if (!colData) {
|
if (!colData) {
|
||||||
|
|
|
@ -76,6 +76,32 @@ static void msg_process(TAOS_RES* msg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int buildDatabase(TAOS* pConn, TAOS_RES* pRes){
|
int buildDatabase(TAOS* pConn, TAOS_RES* pRes){
|
||||||
|
|
||||||
|
/* test for TD-20612 start*/
|
||||||
|
// pRes = taos_query(pConn,"create table tb1 (ts timestamp, c1 int, c2 int)");
|
||||||
|
// if (taos_errno(pRes) != 0) {
|
||||||
|
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
// taos_free_result(pRes);
|
||||||
|
//
|
||||||
|
// pRes = taos_query(pConn,"insert into tb1 (ts, c1) values(1669092069069, 0)");
|
||||||
|
// if (taos_errno(pRes) != 0) {
|
||||||
|
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
// taos_free_result(pRes);
|
||||||
|
//
|
||||||
|
// pRes = taos_query(pConn,"insert into tb1 (ts, c2) values(1669092069069, 1)");
|
||||||
|
// if (taos_errno(pRes) != 0) {
|
||||||
|
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
// taos_free_result(pRes);
|
||||||
|
//
|
||||||
|
// return 0;
|
||||||
|
/* test for TD-20612 end*/
|
||||||
|
|
||||||
pRes = taos_query(pConn,
|
pRes = taos_query(pConn,
|
||||||
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
|
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
|
||||||
"nchar(8), t4 bool)");
|
"nchar(8), t4 bool)");
|
||||||
|
|
Loading…
Reference in New Issue