fix:[TD-20612] error if write raw with some cols
This commit is contained in:
parent
9b5d78d23d
commit
5bed172337
|
@ -1294,7 +1294,6 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
|
|||
|
||||
SRowBuilder rb = {0};
|
||||
tdSRowInit(&rb, pTableMeta->sversion);
|
||||
rb.rowType = TD_ROW_KV;
|
||||
tdSRowSetTpInfo(&rb, numOfCols, fLen);
|
||||
int32_t dataLen = 0;
|
||||
|
||||
|
@ -1705,8 +1704,8 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
|||
uint16_t fLen = 0;
|
||||
int32_t rowSize = 0;
|
||||
int16_t nVar = 0;
|
||||
for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
|
||||
SSchema* schema = &pTableMeta->schema[i];
|
||||
for (int i = 0; i < pSW->nCols; i++) {
|
||||
SSchema* schema = &pSW->pSchema[i];
|
||||
fLen += TYPE_BYTES[schema->type];
|
||||
rowSize += schema->bytes;
|
||||
if (IS_VAR_DATA_TYPE(schema->type)) {
|
||||
|
@ -1717,7 +1716,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
|||
|
||||
int32_t rows = rspObj.resInfo.numOfRows;
|
||||
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
|
||||
(int32_t)TD_BITMAP_BYTES(pTableMeta->tableInfo.numOfColumns - 1);
|
||||
(int32_t)TD_BITMAP_BYTES(pSW->nCols - 1);
|
||||
int32_t schemaLen = 0;
|
||||
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
|
||||
|
||||
|
@ -1764,14 +1763,14 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
|||
|
||||
SRowBuilder rb = {0};
|
||||
tdSRowInit(&rb, sver);
|
||||
tdSRowSetTpInfo(&rb, pTableMeta->tableInfo.numOfColumns, fLen);
|
||||
tdSRowSetTpInfo(&rb, pSW->nCols, fLen);
|
||||
int32_t totalLen = 0;
|
||||
|
||||
SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
for (int i = 0; i < pSW->nCols; i++) {
|
||||
SSchema* schema = &pSW->pSchema[i];
|
||||
taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t));
|
||||
}
|
||||
// SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
// for (int i = 0; i < pSW->nCols; i++) {
|
||||
// SSchema* schema = &pSW->pSchema[i];
|
||||
// taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t));
|
||||
// }
|
||||
|
||||
for (int32_t j = 0; j < rows; j++) {
|
||||
tdSRowResetBuf(&rb, rowData);
|
||||
|
@ -1780,22 +1779,29 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
|||
rspObj.resInfo.current += 1;
|
||||
|
||||
int32_t offset = 0;
|
||||
for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) {
|
||||
const SSchema* pColumn = &pTableMeta->schema[k];
|
||||
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
|
||||
if (!index) {
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
|
||||
} else {
|
||||
char* colData = rspObj.resInfo.row[*index];
|
||||
if (!colData) {
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
|
||||
} else {
|
||||
if (IS_VAR_DATA_TYPE(pColumn->type)) {
|
||||
colData -= VARSTR_HEADER_SIZE;
|
||||
}
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k);
|
||||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||
const SSchema* pColumn = NULL;
|
||||
for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) {
|
||||
if (strcmp(pTableMeta->schema[k].name, pSW->pSchema[i].name) == 0) {
|
||||
pColumn = &pTableMeta->schema[k];
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (pColumn == NULL){
|
||||
uError("column not exist:%s", pSW->pSchema[i].name);
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
goto end;
|
||||
}
|
||||
char* colData = rspObj.resInfo.row[i];
|
||||
if (!colData) {
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
|
||||
} else {
|
||||
if (IS_VAR_DATA_TYPE(pColumn->type)) {
|
||||
colData -= VARSTR_HEADER_SIZE;
|
||||
}
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k);
|
||||
}
|
||||
|
||||
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
offset += TYPE_BYTES[pColumn->type];
|
||||
}
|
||||
|
@ -1806,7 +1812,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
|||
totalLen += rowLen;
|
||||
}
|
||||
|
||||
taosHashCleanup(schemaHash);
|
||||
blk->uid = htobe64(uid);
|
||||
blk->suid = htobe64(suid);
|
||||
blk->sversion = htonl(sver);
|
||||
|
|
Loading…
Reference in New Issue