opti:[TD-28118] raw block data for tmq

This commit is contained in:
wangmm0220 2024-01-19 15:28:23 +08:00
parent 1b85adaa3a
commit f95d5e4f62
3 changed files with 4 additions and 5 deletions

View File

@ -150,7 +150,7 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS
STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl,
char* msgBuf, int32_t msgBufLen); char* msgBuf, int32_t msgBufLen);
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash); int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD *fields, int numFields, bool needChangeLength); int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD *fields, int numFields, bool needChangeLength);
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap); SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap);

View File

@ -1838,12 +1838,11 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
} }
void* rawData = getRawDataFromRes(pRetrieve); void* rawData = getRawDataFromRes(pRetrieve);
code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, fields, pSW->nCols, true); code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true);
taosMemoryFree(fields); taosMemoryFree(fields);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
pCreateReqDst = NULL;
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
} }

View File

@ -640,12 +640,12 @@ static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
return false; return false;
} }
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields, int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD* tFields,
int numFields, bool needChangeLength) { int numFields, bool needChangeLength) {
void* tmp = taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid)); void* tmp = taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
STableDataCxt* pTableCxt = NULL; STableDataCxt* pTableCxt = NULL;
int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, &pCreateTb, &pTableCxt, true, false); sizeof(pTableMeta->uid), pTableMeta, pCreateTb, &pTableCxt, true, false);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
uError("insGetTableDataCxt error"); uError("insGetTableDataCxt error");
goto end; goto end;