From bd84ecc3618631fc3b45a8b8653d948641d78231 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 28 Nov 2022 16:31:33 +0800 Subject: [PATCH 1/6] fix:[TD-20613] add interface for write raw block with sdfasdf --- include/client/taos.h | 1 + include/os/os.h | 3 +- source/client/src/clientImpl.c | 2 +- source/client/src/clientRawBlockWrite.c | 206 ++++++++++++++++++++++++ 4 files changed, 210 insertions(+), 2 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 25887b2879..677c830ff7 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -297,6 +297,7 @@ DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw); DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw); DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char *tbname); +DLL_EXPORT int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD *fields, int numFields); DLL_EXPORT void tmq_free_raw(tmq_raw_data raw); // Returning null means error. Returned result need to be freed by tmq_free_json_meta DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); diff --git a/include/os/os.h b/include/os/os.h index ab4d0a406e..629b0e27c6 100644 --- a/include/os/os.h +++ b/include/os/os.h @@ -43,12 +43,13 @@ extern "C" { #include #include #include -#include #if defined(DARWIN) #else #include #include +#include + #endif #else diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index c3140371c4..ec94520fb0 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1645,7 +1645,7 @@ int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) { static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) { char* p = (char*)pResultInfo->pData; - // version + length + numOfRows + numOfCol + groupId + flag_segment + column_info + // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length | int32_t len = getVersion1BlockMetaSize(p, numOfCols); int32_t* colLength = (int32_t*)(p + len); len += sizeof(int32_t) * numOfCols; diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 6a6a5d56d7..ea0b900e68 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1211,6 +1211,211 @@ static void destroyVgHash(void* data) { taosMemoryFreeClear(vgData->data); } +int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD *fields, int numFields){ + int32_t code = TSDB_CODE_SUCCESS; + STableMeta* pTableMeta = NULL; + SQuery* pQuery = NULL; + SSubmitReq* subReq = NULL; + + SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); + if (!pRequest) { + uError("WriteRaw:createRequest error request is null"); + code = terrno; + goto end; + } + + pRequest->syncQuery = true; + if (!pRequest->pDb) { + uError("WriteRaw:not use db"); + code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; + goto end; + } + + SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; + tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname)); + tstrncpy(pName.tname, tbname, sizeof(pName.tname)); + + struct SCatalog* pCatalog = NULL; + code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw: get gatlog error"); + goto end; + } + + SRequestConnInfo conn = {0}; + conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter; + conn.requestId = pRequest->requestId; + conn.requestObjRefId = pRequest->self; + conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); + + SVgroupInfo vgData = {0}; + code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbname); + goto end; + } + + code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname); + goto end; + } + uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid); + uint64_t uid = pTableMeta->uid; + int32_t numOfCols = numFields; + + uint16_t fLen = 0; + int32_t rowSize = 0; + int16_t nVar = 0; + for(int k = 0; k < numOfCols; k++){ + for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) { + SSchema* schema = pTableMeta->schema + i; + if(strcmp(schema->name, fields[k].name) != 0) continue; + fLen += TYPE_BYTES[schema->type]; + rowSize += schema->bytes; + if (IS_VAR_DATA_TYPE(schema->type)) { + nVar++; + } + } + } + + fLen -= sizeof(TSKEY); + + int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + + (int32_t)TD_BITMAP_BYTES(numOfCols - 1); + int32_t schemaLen = 0; + int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; + + int32_t totalLen = sizeof(SSubmitReq) + submitLen; + subReq = taosMemoryCalloc(1, totalLen); + SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq)); + void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk)); + STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen); + + SRowBuilder rb = {0}; + tdSRowInit(&rb, pTableMeta->sversion); + tdSRowSetTpInfo(&rb, numOfCols, fLen); + int32_t dataLen = 0; + + // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length | + char* pStart = pData + getVersion1BlockMetaSize(pData, numOfCols); + int32_t* colLength = (int32_t*)pStart; + pStart += sizeof(int32_t) * numOfCols; + + SResultColumn* pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn)); + + for (int32_t i = 0; i < numOfCols; ++i) { + if (IS_VAR_DATA_TYPE(pTableMeta->schema[i].type)) { + pCol[i].offset = (int32_t*)pStart; + pStart += rows * sizeof(int32_t); + } else { + pCol[i].nullbitmap = pStart; + pStart += BitmapLen(rows); + } + + pCol[i].pData = pStart; + pStart += colLength[i]; + } + + for (int32_t j = 0; j < rows; j++) { + tdSRowResetBuf(&rb, rowData); + int32_t offset = 0; + for (int32_t k = 0; k < numOfCols; k++) { + const SSchema* pColumn = NULL; + for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) { + if (strcmp((pTableMeta->schema + i)->name, fields[k].name) == 0) { + pColumn = pTableMeta->schema + i; + break; + } + } + if(pColumn == NULL){ + uError("column not exist:%s", fields[k].name); + code = TSDB_CODE_INVALID_PARA; + goto end; + } + + if (IS_VAR_DATA_TYPE(pColumn->type)) { + if (pCol[k].offset[j] != -1) { + char* data = pCol[k].pData + pCol[k].offset[j]; + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k); + } else { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + } + } else { + if (!colDataIsNull_f(pCol[k].nullbitmap, j)) { + char* data = pCol[k].pData + pColumn->bytes * j; + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k); + } else { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + } + } + + if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) { + offset += TYPE_BYTES[pColumn->type]; + } + } + tdSRowEnd(&rb); + int32_t rowLen = TD_ROW_LEN(rowData); + rowData = POINTER_SHIFT(rowData, rowLen); + dataLen += rowLen; + } + + taosMemoryFree(pCol); + + blk->uid = htobe64(uid); + blk->suid = htobe64(suid); + blk->sversion = htonl(pTableMeta->sversion); + blk->schemaLen = htonl(schemaLen); + blk->numOfRows = htonl(rows); + blk->dataLen = htonl(dataLen); + subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen; + subReq->numOfBlocks = 1; + + pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); + if (NULL == pQuery) { + uError("create SQuery error"); + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; + pQuery->haveResultSet = false; + pQuery->msgType = TDMT_VND_SUBMIT; + pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); + if (NULL == pQuery->pRoot) { + uError("create pQuery->pRoot error"); + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot); + nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES); + + SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); + if (NULL == dst) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto end; + } + dst->vg = vgData; + dst->numOfTables = subReq->numOfBlocks; + dst->size = subReq->length; + dst->pData = (char*)subReq; + subReq->header.vgId = htonl(dst->vg.vgId); + subReq->version = htonl(1); + subReq->header.contLen = htonl(subReq->length); + subReq->length = htonl(subReq->length); + subReq->numOfBlocks = htonl(subReq->numOfBlocks); + subReq = NULL; // no need free + taosArrayPush(nodeStmt->pDataBlocks, &dst); + + launchQueryImpl(pRequest, pQuery, true, NULL); + code = pRequest->code; + + end: + taosMemoryFreeClear(pTableMeta); + qDestroyQuery(pQuery); + taosMemoryFree(subReq); + return code; +} + int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) { int32_t code = TSDB_CODE_SUCCESS; STableMeta* pTableMeta = NULL; @@ -1293,6 +1498,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) tdSRowSetTpInfo(&rb, numOfCols, fLen); int32_t dataLen = 0; + // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length | char* pStart = pData + getVersion1BlockMetaSize(pData, numOfCols); int32_t* colLength = (int32_t*)pStart; pStart += sizeof(int32_t) * numOfCols; From 9b5d78d23dbb9eaf3a3135a57c2bdc69eb25bbbe Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 28 Nov 2022 16:32:06 +0800 Subject: [PATCH 2/6] fix:[TD-20613] add interface for write raw block with some fields --- source/client/src/clientRawBlockWrite.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index ea0b900e68..bcb97b7354 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1294,6 +1294,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch SRowBuilder rb = {0}; tdSRowInit(&rb, pTableMeta->sversion); + rb.rowType = TD_ROW_KV; tdSRowSetTpInfo(&rb, numOfCols, fLen); int32_t dataLen = 0; From 5bed17233782595c0c554cfbd1422ac96a10109d Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 29 Nov 2022 10:31:08 +0800 Subject: [PATCH 3/6] fix:[TD-20612] error if write raw with some cols --- source/client/src/clientRawBlockWrite.c | 55 ++++++++++++++----------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index bcb97b7354..8bae3c23bc 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1294,7 +1294,6 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch SRowBuilder rb = {0}; tdSRowInit(&rb, pTableMeta->sversion); - rb.rowType = TD_ROW_KV; tdSRowSetTpInfo(&rb, numOfCols, fLen); int32_t dataLen = 0; @@ -1705,8 +1704,8 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { uint16_t fLen = 0; int32_t rowSize = 0; int16_t nVar = 0; - for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) { - SSchema* schema = &pTableMeta->schema[i]; + for (int i = 0; i < pSW->nCols; i++) { + SSchema* schema = &pSW->pSchema[i]; fLen += TYPE_BYTES[schema->type]; rowSize += schema->bytes; if (IS_VAR_DATA_TYPE(schema->type)) { @@ -1717,7 +1716,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { int32_t rows = rspObj.resInfo.numOfRows; int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + - (int32_t)TD_BITMAP_BYTES(pTableMeta->tableInfo.numOfColumns - 1); + (int32_t)TD_BITMAP_BYTES(pSW->nCols - 1); int32_t schemaLen = 0; int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; @@ -1764,14 +1763,14 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { SRowBuilder rb = {0}; tdSRowInit(&rb, sver); - tdSRowSetTpInfo(&rb, pTableMeta->tableInfo.numOfColumns, fLen); + tdSRowSetTpInfo(&rb, pSW->nCols, fLen); int32_t totalLen = 0; - SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - for (int i = 0; i < pSW->nCols; i++) { - SSchema* schema = &pSW->pSchema[i]; - taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t)); - } +// SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); +// for (int i = 0; i < pSW->nCols; i++) { +// SSchema* schema = &pSW->pSchema[i]; +// taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t)); +// } for (int32_t j = 0; j < rows; j++) { tdSRowResetBuf(&rb, rowData); @@ -1780,22 +1779,29 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { rspObj.resInfo.current += 1; int32_t offset = 0; - for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) { - const SSchema* pColumn = &pTableMeta->schema[k]; - int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name)); - if (!index) { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); - } else { - char* colData = rspObj.resInfo.row[*index]; - if (!colData) { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); - } else { - if (IS_VAR_DATA_TYPE(pColumn->type)) { - colData -= VARSTR_HEADER_SIZE; - } - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k); + for (int32_t i = 0; i < pSW->nCols; i++) { + const SSchema* pColumn = NULL; + for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) { + if (strcmp(pTableMeta->schema[k].name, pSW->pSchema[i].name) == 0) { + pColumn = &pTableMeta->schema[k]; + break; } } + if (pColumn == NULL){ + uError("column not exist:%s", pSW->pSchema[i].name); + code = TSDB_CODE_INVALID_PARA; + goto end; + } + char* colData = rspObj.resInfo.row[i]; + if (!colData) { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + } else { + if (IS_VAR_DATA_TYPE(pColumn->type)) { + colData -= VARSTR_HEADER_SIZE; + } + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k); + } + if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) { offset += TYPE_BYTES[pColumn->type]; } @@ -1806,7 +1812,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { totalLen += rowLen; } - taosHashCleanup(schemaHash); blk->uid = htobe64(uid); blk->suid = htobe64(suid); blk->sversion = htonl(sver); From 829a7631b38d601ac4a2a475da37546e0624d8de Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 29 Nov 2022 10:49:47 +0800 Subject: [PATCH 4/6] fix:[TD-20612] error if write raw with some cols --- source/client/src/clientRawBlockWrite.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 8bae3c23bc..a3447dabda 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1794,12 +1794,12 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { } char* colData = rspObj.resInfo.row[i]; if (!colData) { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, i); } else { if (IS_VAR_DATA_TYPE(pColumn->type)) { colData -= VARSTR_HEADER_SIZE; } - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k); + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, i); } if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) { From 8b02719ea579413ed7090360098a24a4a680eed8 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 29 Nov 2022 17:01:16 +0800 Subject: [PATCH 5/6] fix:[TD-20612] error if write raw with some cols --- source/client/src/clientRawBlockWrite.c | 122 +++++++++++------------- utils/test/c/tmq_taosx_ci.c | 26 +++++ 2 files changed, 83 insertions(+), 65 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index a3447dabda..9db79c6eb6 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1262,22 +1262,19 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch } uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid); uint64_t uid = pTableMeta->uid; - int32_t numOfCols = numFields; + int32_t numOfCols = pTableMeta->tableInfo.numOfColumns; uint16_t fLen = 0; int32_t rowSize = 0; int16_t nVar = 0; - for(int k = 0; k < numOfCols; k++){ for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) { SSchema* schema = pTableMeta->schema + i; - if(strcmp(schema->name, fields[k].name) != 0) continue; fLen += TYPE_BYTES[schema->type]; rowSize += schema->bytes; if (IS_VAR_DATA_TYPE(schema->type)) { nVar++; } } - } fLen -= sizeof(TSKEY); @@ -1298,14 +1295,14 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch int32_t dataLen = 0; // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length | - char* pStart = pData + getVersion1BlockMetaSize(pData, numOfCols); + char* pStart = pData + getVersion1BlockMetaSize(pData, numFields); int32_t* colLength = (int32_t*)pStart; - pStart += sizeof(int32_t) * numOfCols; + pStart += sizeof(int32_t) * numFields; - SResultColumn* pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn)); + SResultColumn* pCol = taosMemoryCalloc(numFields, sizeof(SResultColumn)); - for (int32_t i = 0; i < numOfCols; ++i) { - if (IS_VAR_DATA_TYPE(pTableMeta->schema[i].type)) { + for (int32_t i = 0; i < numFields; ++i) { + if (IS_VAR_DATA_TYPE(fields[i].type)) { pCol[i].offset = (int32_t*)pStart; pStart += rows * sizeof(int32_t); } else { @@ -1317,36 +1314,35 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch pStart += colLength[i]; } + SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + for (int i = 0; i < numFields; i++) { + TAOS_FIELD* schema = &fields[i]; + taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t)); + } + for (int32_t j = 0; j < rows; j++) { tdSRowResetBuf(&rb, rowData); int32_t offset = 0; for (int32_t k = 0; k < numOfCols; k++) { - const SSchema* pColumn = NULL; - for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) { - if (strcmp((pTableMeta->schema + i)->name, fields[k].name) == 0) { - pColumn = pTableMeta->schema + i; - break; - } - } - if(pColumn == NULL){ - uError("column not exist:%s", fields[k].name); - code = TSDB_CODE_INVALID_PARA; - goto end; - } - - if (IS_VAR_DATA_TYPE(pColumn->type)) { - if (pCol[k].offset[j] != -1) { - char* data = pCol[k].pData + pCol[k].offset[j]; - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k); + const SSchema* pColumn = &pTableMeta->schema[k]; + int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name)); + if (!index) { // add none + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k); + }else{ + if (IS_VAR_DATA_TYPE(pColumn->type)) { + if (pCol[k].offset[j] != -1) { + char* data = pCol[k].pData + pCol[k].offset[j]; + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k); + } else { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + } } else { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); - } - } else { - if (!colDataIsNull_f(pCol[k].nullbitmap, j)) { - char* data = pCol[k].pData + pColumn->bytes * j; - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k); - } else { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + if (!colDataIsNull_f(pCol[k].nullbitmap, j)) { + char* data = pCol[k].pData + pColumn->bytes * j; + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k); + } else { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + } } } @@ -1360,6 +1356,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch dataLen += rowLen; } + taosHashCleanup(schemaHash); taosMemoryFree(pCol); blk->uid = htobe64(uid); @@ -1704,8 +1701,8 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { uint16_t fLen = 0; int32_t rowSize = 0; int16_t nVar = 0; - for (int i = 0; i < pSW->nCols; i++) { - SSchema* schema = &pSW->pSchema[i]; + for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) { + SSchema* schema = &pTableMeta->schema[i]; fLen += TYPE_BYTES[schema->type]; rowSize += schema->bytes; if (IS_VAR_DATA_TYPE(schema->type)) { @@ -1716,7 +1713,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { int32_t rows = rspObj.resInfo.numOfRows; int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + - (int32_t)TD_BITMAP_BYTES(pSW->nCols - 1); + (int32_t)TD_BITMAP_BYTES(pTableMeta->tableInfo.numOfColumns - 1); int32_t schemaLen = 0; int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; @@ -1763,14 +1760,14 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { SRowBuilder rb = {0}; tdSRowInit(&rb, sver); - tdSRowSetTpInfo(&rb, pSW->nCols, fLen); + tdSRowSetTpInfo(&rb, pTableMeta->tableInfo.numOfColumns, fLen); int32_t totalLen = 0; -// SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); -// for (int i = 0; i < pSW->nCols; i++) { -// SSchema* schema = &pSW->pSchema[i]; -// taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t)); -// } + SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + for (int i = 0; i < pSW->nCols; i++) { + SSchema* schema = &pSW->pSchema[i]; + taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t)); + } for (int32_t j = 0; j < rows; j++) { tdSRowResetBuf(&rb, rowData); @@ -1779,29 +1776,22 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { rspObj.resInfo.current += 1; int32_t offset = 0; - for (int32_t i = 0; i < pSW->nCols; i++) { - const SSchema* pColumn = NULL; - for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) { - if (strcmp(pTableMeta->schema[k].name, pSW->pSchema[i].name) == 0) { - pColumn = &pTableMeta->schema[k]; - break; - } - } - if (pColumn == NULL){ - uError("column not exist:%s", pSW->pSchema[i].name); - code = TSDB_CODE_INVALID_PARA; - goto end; - } - char* colData = rspObj.resInfo.row[i]; - if (!colData) { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, i); + for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) { + const SSchema* pColumn = &pTableMeta->schema[k]; + int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name)); + if (!index) { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k); } else { - if (IS_VAR_DATA_TYPE(pColumn->type)) { - colData -= VARSTR_HEADER_SIZE; + char* colData = rspObj.resInfo.row[*index]; + if (!colData) { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + } else { + if (IS_VAR_DATA_TYPE(pColumn->type)) { + colData -= VARSTR_HEADER_SIZE; + } + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k); } - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, i); } - if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) { offset += TYPE_BYTES[pColumn->type]; } @@ -1812,6 +1802,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { totalLen += rowLen; } + taosHashCleanup(schemaHash); blk->uid = htobe64(uid); blk->suid = htobe64(suid); blk->sversion = htonl(sver); @@ -1882,6 +1873,7 @@ end: return code; } + static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) { int32_t code = TSDB_CODE_SUCCESS; SHashObj* pVgHash = NULL; @@ -2094,7 +2086,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) const SSchema* pColumn = &pTableMeta->schema[k]; int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name)); if (!index) { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k); } else { char* colData = rspObj.resInfo.row[*index]; if (!colData) { @@ -2175,7 +2167,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; -end: + end: tDeleteSTaosxRsp(&rspObj.rsp); rspObj.resInfo.pRspMsg = NULL; doFreeReqResultInfo(&rspObj.resInfo); diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 6540fdac4c..5af9ba68b2 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -76,6 +76,32 @@ static void msg_process(TAOS_RES* msg) { } int buildDatabase(TAOS* pConn, TAOS_RES* pRes){ + + /* test for TD-20612 start*/ +// pRes = taos_query(pConn,"create table tb1 (ts timestamp, c1 int, c2 int)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn,"insert into tb1 (ts, c1) values(1669092069069, 0)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn,"insert into tb1 (ts, c2) values(1669092069069, 1)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// return 0; + /* test for TD-20612 end*/ + pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " "nchar(8), t4 bool)"); From b2ed72aeca780e1f156a2cbf87f62a4667ff3ae0 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 29 Nov 2022 19:54:06 +0800 Subject: [PATCH 6/6] fix:error in index --- source/client/src/clientRawBlockWrite.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 9db79c6eb6..5583c8ae00 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1330,15 +1330,15 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k); }else{ if (IS_VAR_DATA_TYPE(pColumn->type)) { - if (pCol[k].offset[j] != -1) { - char* data = pCol[k].pData + pCol[k].offset[j]; + if (pCol[*index].offset[j] != -1) { + char* data = pCol[*index].pData + pCol[*index].offset[j]; tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k); } else { tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); } } else { - if (!colDataIsNull_f(pCol[k].nullbitmap, j)) { - char* data = pCol[k].pData + pColumn->bytes * j; + if (!colDataIsNull_f(pCol[*index].nullbitmap, j)) { + char* data = pCol[*index].pData + pColumn->bytes * j; tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k); } else { tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);