fix:add new submit for taosx

This commit is contained in:
wangmm0220 2022-12-03 22:20:16 +08:00
parent b4a9e55b47
commit cbeec94bbe
5 changed files with 190 additions and 377 deletions

View File

@ -126,6 +126,9 @@ extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max,
int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind);
void tColDataSortMerge(SArray *colDataArr);
//for raw block
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes,
int32_t nRows, char* lengthOrbitmap, char *data);
// for encode/decode
int32_t tPutColData(uint8_t *pBuf, SColData *pColData);
int32_t tGetColData(uint8_t *pBuf, SColData *pColData);

View File

@ -111,6 +111,8 @@ int32_t smlBindData(SQuery* handle, SArray* tags, SArray* colsSchema, SArray* co
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen);
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, SRetrieveTableRsp* pRsp, SVCreateTbReq* pCreateTb);
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap);
SArray* serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap);

View File

@ -1201,16 +1201,6 @@ end:
return code;
}
typedef struct {
SVgroupInfo vg;
void* data;
} VgData;
static void destroyVgHash(void* data) {
VgData* vgData = (VgData*)data;
taosMemoryFreeClear(vgData->data);
}
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD *fields, int numFields){
int32_t code = TSDB_CODE_SUCCESS;
STableMeta* pTableMeta = NULL;
@ -1637,8 +1627,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
goto end;
}
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
taosHashSetFreeFp(pVgHash, destroyVgHash);
struct SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
@ -1652,6 +1640,13 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
conn.requestObjRefId = pRequest->self;
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
pQuery = smlInitHandle();
if(pQuery == NULL){
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
printf("raw data block num:%d\n", rspObj.rsp.blockNum);
while (++rspObj.resIter < rspObj.rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
@ -1659,14 +1654,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
uError("WriteRaw:no schema, iter:%d", rspObj.resIter);
goto end;
}
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols);
code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw: setQueryResultFromRsp error");
goto end;
}
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
if (!tbName) {
@ -1680,13 +1667,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
strcpy(pName.dbname, pRequest->pDb);
strcpy(pName.tname, tbName);
VgData vgData = {0};
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &(vgData.vg));
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
goto end;
}
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName);
@ -1698,164 +1678,29 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
goto end;
}
uint16_t fLen = 0;
int32_t rowSize = 0;
int16_t nVar = 0;
for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
SSchema* schema = &pTableMeta->schema[i];
fLen += TYPE_BYTES[schema->type];
rowSize += schema->bytes;
if (IS_VAR_DATA_TYPE(schema->type)) {
nVar++;
}
}
fLen -= sizeof(TSKEY);
int32_t rows = rspObj.resInfo.numOfRows;
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
(int32_t)TD_BITMAP_BYTES(pTableMeta->tableInfo.numOfColumns - 1);
int32_t schemaLen = 0;
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
SSubmitReq* subReq = NULL;
SSubmitBlk* blk = NULL;
void* hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId));
if (hData) {
vgData = *(VgData*)hData;
int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen;
void* tmp = taosMemoryRealloc(vgData.data, totalLen);
if (tmp == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto end;
}
vgData.data = tmp;
((VgData*)hData)->data = tmp;
subReq = (SSubmitReq*)(vgData.data);
blk = POINTER_SHIFT(vgData.data, subReq->length);
} else {
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
void* tmp = taosMemoryCalloc(1, totalLen);
if (tmp == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto end;
}
vgData.data = tmp;
taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData));
subReq = (SSubmitReq*)(vgData.data);
subReq->length = sizeof(SSubmitReq);
subReq->numOfBlocks = 0;
blk = POINTER_SHIFT(vgData.data, sizeof(SSubmitReq));
}
// pSW->pSchema should be same as pTableMeta->schema
// ASSERT(pSW->nCols == pTableMeta->tableInfo.numOfColumns);
uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
uint64_t uid = pTableMeta->uid;
int16_t sver = pTableMeta->sversion;
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
SRowBuilder rb = {0};
tdSRowInit(&rb, sver);
tdSRowSetTpInfo(&rb, pTableMeta->tableInfo.numOfColumns, fLen);
int32_t totalLen = 0;
SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
for (int i = 0; i < pSW->nCols; i++) {
SSchema* schema = &pSW->pSchema[i];
taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t));
}
for (int32_t j = 0; j < rows; j++) {
tdSRowResetBuf(&rb, rowData);
doSetOneRowPtr(&rspObj.resInfo);
rspObj.resInfo.current += 1;
int32_t offset = 0;
for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) {
const SSchema* pColumn = &pTableMeta->schema[k];
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
if (!index) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k);
} else {
char* colData = rspObj.resInfo.row[*index];
if (!colData) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
} else {
if (IS_VAR_DATA_TYPE(pColumn->type)) {
colData -= VARSTR_HEADER_SIZE;
}
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k);
}
}
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
offset += TYPE_BYTES[pColumn->type];
}
}
tdSRowEnd(&rb);
int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen);
totalLen += rowLen;
}
taosHashCleanup(schemaHash);
blk->uid = htobe64(uid);
blk->suid = htobe64(suid);
blk->sversion = htonl(sver);
blk->schemaLen = htonl(schemaLen);
blk->numOfRows = htonl(rows);
blk->dataLen = htonl(totalLen);
subReq->length += sizeof(SSubmitBlk) + schemaLen + totalLen;
subReq->numOfBlocks++;
taosMemoryFreeClear(pTableMeta);
rspObj.resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&rspObj.resInfo);
}
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
if (NULL == pQuery) {
uError("create SQuery error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->haveResultSet = false;
pQuery->msgType = TDMT_VND_SUBMIT;
pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
if (NULL == pQuery->pRoot) {
uError("create pQuery->pRoot error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
int32_t numOfVg = taosHashGetSize(pVgHash);
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL);
while (vData) {
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == dst) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
SVgroupInfo vg;
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
goto end;
}
dst->vg = vData->vg;
SSubmitReq* subReq = (SSubmitReq*)(vData->data);
dst->numOfTables = subReq->numOfBlocks;
dst->size = subReq->length;
dst->pData = (char*)subReq;
vData->data = NULL; // no need free
subReq->header.vgId = htonl(dst->vg.vgId);
subReq->version = htonl(1);
subReq->header.contLen = htonl(subReq->length);
subReq->length = htonl(subReq->length);
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
taosArrayPush(nodeStmt->pDataBlocks, &dst);
vData = (VgData*)taosHashIterate(pVgHash, vData);
void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId));
if (hData == NULL) {
taosHashPut(pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
}
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve, NULL);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:rawBlockBindData failed");
goto end;
}
}
code = smlBuildOutput(pQuery, pVgHash);
if (code != TSDB_CODE_SUCCESS) {
uError("smlBuildOutput failed");
return code;
}
launchQueryImpl(pRequest, pQuery, true, NULL);
@ -1863,8 +1708,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
end:
tDeleteSMqDataRsp(&rspObj.rsp);
rspObj.resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&rspObj.resInfo);
tDecoderClear(&decoder);
qDestroyQuery(pQuery);
destroyRequest(pRequest);
@ -1907,8 +1750,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end;
}
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
taosHashSetFreeFp(pVgHash, destroyVgHash);
struct SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
@ -1922,6 +1763,13 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
conn.requestObjRefId = pRequest->self;
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
pQuery = smlInitHandle();
if(pQuery == NULL){
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
printf("raw data block num:%d\n", rspObj.rsp.blockNum);
while (++rspObj.resIter < rspObj.rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
@ -1929,14 +1777,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
uError("WriteRaw:no schema, iter:%d", rspObj.resIter);
goto end;
}
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols);
code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw: setQueryResultFromRsp error");
goto end;
}
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
if (!tbName) {
@ -1950,44 +1790,28 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
strcpy(pName.dbname, pRequest->pDb);
strcpy(pName.tname, tbName);
VgData vgData = {0};
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &(vgData.vg));
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
goto end;
}
// find schema data info
int32_t schemaLen = 0;
void* schemaData = NULL;
SVCreateTbReq pCreateReq = {0};
for (int j = 0; j < rspObj.rsp.createTableNum; j++) {
void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j);
int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j);
SDecoder decoderTmp = {0};
SVCreateTbReq pCreateReq = {0};
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
memset(&pCreateReq, 0, sizeof(SVCreateTbReq));
if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) {
tDecoderClear(&decoderTmp);
taosMemoryFreeClear(pCreateReq.comment);
taosArrayDestroy(pCreateReq.ctb.tagName);
goto end;
}
ASSERT(pCreateReq.type == TSDB_CHILD_TABLE);
if (strcmp(tbName, pCreateReq.name) == 0) {
schemaLen = *lenTmp;
schemaData = *dataTmp;
strcpy(pName.tname, pCreateReq.ctb.stbName);
tDecoderClear(&decoderTmp);
taosMemoryFreeClear(pCreateReq.comment);
taosArrayDestroy(pCreateReq.ctb.tagName);
break;
}
tDecoderClear(&decoderTmp);
taosMemoryFreeClear(pCreateReq.comment);
taosArrayDestroy(pCreateReq.ctb.tagName);
}
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
@ -2001,167 +1825,23 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end;
}
uint16_t fLen = 0;
int32_t rowSize = 0;
int16_t nVar = 0;
for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
SSchema* schema = &pTableMeta->schema[i];
fLen += TYPE_BYTES[schema->type];
rowSize += schema->bytes;
if (IS_VAR_DATA_TYPE(schema->type)) {
nVar++;
}
}
fLen -= sizeof(TSKEY);
int32_t rows = rspObj.resInfo.numOfRows;
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
(int32_t)TD_BITMAP_BYTES(pTableMeta->tableInfo.numOfColumns - 1);
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
SSubmitReq* subReq = NULL;
SSubmitBlk* blk = NULL;
void* hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId));
if (hData) {
vgData = *(VgData*)hData;
int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen;
void* tmp = taosMemoryRealloc(vgData.data, totalLen);
if (tmp == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto end;
}
vgData.data = tmp;
((VgData*)hData)->data = tmp;
subReq = (SSubmitReq*)(vgData.data);
blk = POINTER_SHIFT(vgData.data, subReq->length);
} else {
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
void* tmp = taosMemoryCalloc(1, totalLen);
if (tmp == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto end;
}
vgData.data = tmp;
taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData));
subReq = (SSubmitReq*)(vgData.data);
subReq->length = sizeof(SSubmitReq);
subReq->numOfBlocks = 0;
blk = POINTER_SHIFT(vgData.data, sizeof(SSubmitReq));
}
// pSW->pSchema should be same as pTableMeta->schema
// ASSERT(pSW->nCols == pTableMeta->tableInfo.numOfColumns);
uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
uint64_t uid = pTableMeta->uid;
int16_t sver = pTableMeta->sversion;
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
if (schemaData) {
memcpy(blkSchema, schemaData, schemaLen);
}
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
SRowBuilder rb = {0};
tdSRowInit(&rb, sver);
tdSRowSetTpInfo(&rb, pTableMeta->tableInfo.numOfColumns, fLen);
int32_t totalLen = 0;
SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
for (int i = 0; i < pSW->nCols; i++) {
SSchema* schema = &pSW->pSchema[i];
taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t));
}
for (int32_t j = 0; j < rows; j++) {
tdSRowResetBuf(&rb, rowData);
doSetOneRowPtr(&rspObj.resInfo);
rspObj.resInfo.current += 1;
int32_t offset = 0;
for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) {
const SSchema* pColumn = &pTableMeta->schema[k];
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
if (!index) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k);
} else {
char* colData = rspObj.resInfo.row[*index];
if (!colData) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
} else {
if (IS_VAR_DATA_TYPE(pColumn->type)) {
colData -= VARSTR_HEADER_SIZE;
}
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k);
}
}
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
offset += TYPE_BYTES[pColumn->type];
}
}
tdSRowEnd(&rb);
int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen);
totalLen += rowLen;
}
taosHashCleanup(schemaHash);
blk->uid = htobe64(uid);
blk->suid = htobe64(suid);
blk->sversion = htonl(sver);
blk->schemaLen = htonl(schemaLen);
blk->numOfRows = htonl(rows);
blk->dataLen = htonl(totalLen);
subReq->length += sizeof(SSubmitBlk) + schemaLen + totalLen;
subReq->numOfBlocks++;
taosMemoryFreeClear(pTableMeta);
rspObj.resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&rspObj.resInfo);
}
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
if (NULL == pQuery) {
uError("create SQuery error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->haveResultSet = false;
pQuery->msgType = TDMT_VND_SUBMIT;
pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
if (NULL == pQuery->pRoot) {
uError("create pQuery->pRoot error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
int32_t numOfVg = taosHashGetSize(pVgHash);
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL);
while (vData) {
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == dst) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
SVgroupInfo vg;
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
goto end;
}
void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId));
if (hData == NULL) {
taosHashPut(pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
}
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve, &pCreateReq);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:rawBlockBindData failed");
goto end;
}
dst->vg = vData->vg;
SSubmitReq* subReq = (SSubmitReq*)(vData->data);
dst->numOfTables = subReq->numOfBlocks;
dst->size = subReq->length;
dst->pData = (char*)subReq;
vData->data = NULL; // no need free
subReq->header.vgId = htonl(dst->vg.vgId);
subReq->version = htonl(1);
subReq->header.contLen = htonl(subReq->length);
subReq->length = htonl(subReq->length);
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
taosArrayPush(nodeStmt->pDataBlocks, &dst);
vData = (VgData*)taosHashIterate(pVgHash, vData);
}
launchQueryImpl(pRequest, pQuery, true, NULL);
@ -2169,8 +1849,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
end:
tDeleteSTaosxRsp(&rspObj.rsp);
rspObj.resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&rspObj.resInfo);
tDecoderClear(&decoder);
qDestroyQuery(pQuery);
destroyRequest(pRequest);

View File

@ -2077,6 +2077,61 @@ _exit:
return code;
}
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes,
int32_t nRows, char* lengthOrbitmap, char *data) {
int32_t code = 0;
if (IS_VAR_DATA_TYPE(type)) { // var-length data type
for (int32_t i = 0; i < nRows; ++i) {
int32_t offset = *((int32_t*)lengthOrbitmap + i);
if (offset == -1) {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
if (code) goto _exit;
} else {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE](
pColData, (uint8_t *)varDataVal(data + offset), varDataLen(data + offset));
}
}
} else { // fixed-length data type
bool allValue = true;
bool allNull = true;
for (int32_t i = 0; i < nRows; ++i) {
if(!colDataIsNull_f(lengthOrbitmap, i)){
allNull = false;
}else{
allValue = false;
}
}
if (allValue) {
// optimize (todo)
for (int32_t i = 0; i < nRows; ++i) {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE](
pColData, (uint8_t *)data + bytes * i, bytes);
}
} else if (allNull) {
// optimize (todo)
for (int32_t i = 0; i < nRows; ++i) {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
if (code) goto _exit;
}
} else {
for (int32_t i = 0; i < nRows; ++i) {
if (colDataIsNull_f(lengthOrbitmap, i)) {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
if (code) goto _exit;
} else {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE](
pColData, (uint8_t *)data + bytes * i, bytes);
}
}
}
}
_exit:
return code;
}
int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind) {
int32_t code = 0;

View File

@ -20,6 +20,7 @@
#include "parUtil.h"
#include "querynodes.h"
#include "tRealloc.h"
#include "tdatablock.h"
typedef struct SBlockKeyTuple {
TSKEY skey;
@ -205,7 +206,7 @@ void qDestroyBoundColInfo(void* pInfo) {
}
static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, STableMeta* pTableMeta,
static int32_t createTableDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks) {
STableDataBlocks* dataBuf = (STableDataBlocks*)taosMemoryCalloc(1, sizeof(STableDataBlocks));
if (dataBuf == NULL) {
@ -296,7 +297,7 @@ int32_t insGetDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, in
}
if (*dataBlocks == NULL) {
int32_t ret = createDataBlock((size_t)size, rowSize, startOffset, pTableMeta, dataBlocks);
int32_t ret = createTableDataBlock((size_t)size, rowSize, startOffset, pTableMeta, dataBlocks);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
@ -1348,3 +1349,77 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
return code;
}
int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, SRetrieveTableRsp* pRsp, SVCreateTbReq *pCreateTb){
STableDataCxt* pTableCxt = NULL;
int ret = insGetTableDataCxt(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid),
pTableMeta, &pCreateTb, &pTableCxt, true);
if (ret != TSDB_CODE_SUCCESS) {
uError("insGetTableDataCxt error");
goto end;
}
// no need to bind, because select * get all fields
ret = initTableColSubmitData(pTableCxt);
if (ret != TSDB_CODE_SUCCESS) {
uError( "initTableColSubmitData error");
goto end;
}
char* p = (char*)pRsp->data;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
p += sizeof(int32_t);
p += sizeof(int32_t);
int32_t numOfRows = *(int32_t*)p;
p += sizeof(int32_t);
int32_t numOfCols = *(int32_t*)p;
p += sizeof(int32_t);
p += sizeof(int32_t);
p += sizeof(uint64_t);
int8_t *fields = p;
p += numOfCols * (sizeof(int8_t) + sizeof(int32_t));
int32_t* colLength = (int32_t*)p;
p += sizeof(int32_t) * numOfCols;
char* pStart = p;
SSchema* pSchema = getTableColumnSchema(pTableCxt->pMeta);
SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
if(boundInfo->numOfBound != numOfCols){
uError("boundInfo->numOfBound:%d != numOfCols:%d", boundInfo->numOfBound, numOfCols);
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
for (int c = 0; c < boundInfo->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
uError( "type or bytes not equal");
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
colLength[c] = htonl(colLength[c]);
int8_t* offset = pStart;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
pStart += numOfRows * sizeof(int32_t);
} else {
pStart += BitmapLen(numOfRows);
}
char *pData = pStart;
tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
fields += sizeof(int8_t) + sizeof(int32_t);
pStart += colLength[c];
}
end:
return ret;
}