feat:add main logic for writing raw data of tmq
This commit is contained in:
parent
6ca882510b
commit
e2414b4289
|
@ -302,8 +302,8 @@ int32_t create_topic() {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
/*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/
|
pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
|
||||||
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
|
// pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -270,6 +270,7 @@ typedef enum tmq_res_t tmq_res_t;
|
||||||
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
|
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
|
||||||
DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, tmq_raw_data *raw_meta);
|
DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, tmq_raw_data *raw_meta);
|
||||||
DLL_EXPORT int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta);
|
DLL_EXPORT int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta);
|
||||||
|
DLL_EXPORT int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *res);
|
||||||
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed by tmq_free_json_meta
|
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed by tmq_free_json_meta
|
||||||
DLL_EXPORT void tmq_free_json_meta(char* jsonMeta);
|
DLL_EXPORT void tmq_free_json_meta(char* jsonMeta);
|
||||||
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
|
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
|
||||||
|
|
|
@ -244,6 +244,7 @@ void *createRequest(uint64_t connId, int32_t type) {
|
||||||
|
|
||||||
STscObj *pTscObj = acquireTscObj(connId);
|
STscObj *pTscObj = acquireTscObj(connId);
|
||||||
if (pTscObj == NULL) {
|
if (pTscObj == NULL) {
|
||||||
|
taosMemoryFree(pRequest);
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -834,7 +834,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
||||||
tstrerror(code), pRequest->requestId);
|
tstrerror(code), pRequest->requestId);
|
||||||
|
|
||||||
STscObj* pTscObj = pRequest->pTscObj;
|
STscObj* pTscObj = pRequest->pTscObj;
|
||||||
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) {
|
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
|
||||||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
|
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
|
||||||
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||||
pRequest->prevCode = code;
|
pRequest->prevCode = code;
|
||||||
|
|
|
@ -1489,7 +1489,7 @@ static SSmlHandle* smlBuildSmlInfo(STscObj* pTscObj, SRequestObj* request, SMLPr
|
||||||
}
|
}
|
||||||
info->id = smlGenId();
|
info->id = smlGenId();
|
||||||
|
|
||||||
info->pQuery = (SQuery *)taosMemoryCalloc(1, sizeof(SQuery));
|
info->pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
|
||||||
if (NULL == info->pQuery) {
|
if (NULL == info->pQuery) {
|
||||||
uError("SML:0x%" PRIx64 " create info->pQuery error", info->id);
|
uError("SML:0x%" PRIx64 " create info->pQuery error", info->id);
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
|
|
|
@ -2952,9 +2952,241 @@ int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta){
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmq_free_raw_meta(tmq_raw_data* rawMeta) {
|
typedef struct{
|
||||||
//
|
SVgroupInfo vg;
|
||||||
taosMemoryFreeClear(rawMeta);
|
void *data;
|
||||||
|
}VgData;
|
||||||
|
|
||||||
|
static void destroyVgHash(void* data) {
|
||||||
|
VgData* vgData = (VgData*)data;
|
||||||
|
taosMemoryFreeClear(vgData->data);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){
|
||||||
|
if (!TD_RES_TMQ(msg)) {
|
||||||
|
uError("WriteRaw:msg is not tmq : %d", *(int8_t*)msg);
|
||||||
|
return TSDB_CODE_TMQ_INVALID_MSG;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SHashObj *pVgHash = NULL;
|
||||||
|
SQuery *pQuery = NULL;
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
|
||||||
|
if(!pRequest){
|
||||||
|
uError("WriteRaw:createRequest error request is null");
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pRequest->pDb) {
|
||||||
|
uError("WriteRaw:not use db");
|
||||||
|
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
taosHashSetFreeFp(pVgHash, destroyVgHash);
|
||||||
|
struct SCatalog *pCatalog = NULL;
|
||||||
|
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||||
|
if(code != TSDB_CODE_SUCCESS){
|
||||||
|
uError("WriteRaw: get gatlog error");
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRequestConnInfo conn = {0};
|
||||||
|
conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
|
||||||
|
conn.requestId = pRequest->requestId;
|
||||||
|
conn.requestObjRefId = pRequest->self;
|
||||||
|
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
|
||||||
|
SMqRspObj *rspObj = ((SMqRspObj*)msg);
|
||||||
|
while (++rspObj->resIter < rspObj->rsp.blockNum) {
|
||||||
|
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj->rsp.blockData, rspObj->resIter);
|
||||||
|
if (!rspObj->rsp.withSchema) {
|
||||||
|
uError("WriteRaw:no schema, iter:%d", rspObj->resIter);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj->rsp.blockSchema, rspObj->resIter);
|
||||||
|
setResSchemaInfo(&rspObj->resInfo, pSW->pSchema, pSW->nCols);
|
||||||
|
|
||||||
|
code = setQueryResultFromRsp(&rspObj->resInfo, pRetrieve, false, false);
|
||||||
|
if(code != TSDB_CODE_SUCCESS){
|
||||||
|
uError("WriteRaw: setQueryResultFromRsp error");
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint16_t fLen = 0;
|
||||||
|
int32_t rowSize = 0;
|
||||||
|
int16_t nVar = 0;
|
||||||
|
for (int i = 0; i < pSW->nCols; i++) {
|
||||||
|
SSchema *schema = pSW->pSchema + i;
|
||||||
|
fLen += TYPE_BYTES[schema->type];
|
||||||
|
rowSize += schema->bytes;
|
||||||
|
if(IS_VAR_DATA_TYPE(schema->type)){
|
||||||
|
nVar ++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t rows = rspObj->resInfo.numOfRows;
|
||||||
|
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
|
||||||
|
(int32_t)TD_BITMAP_BYTES(pSW->nCols - 1);
|
||||||
|
int32_t schemaLen = 0;
|
||||||
|
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
|
||||||
|
|
||||||
|
const char* tbName = tmq_get_table_name(msg);
|
||||||
|
if(!tbName){
|
||||||
|
uError("WriteRaw: tbname is null");
|
||||||
|
code = TSDB_CODE_TMQ_INVALID_MSG;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
|
||||||
|
strcpy(pName.dbname, pRequest->pDb);
|
||||||
|
strcpy(pName.tname, tbName);
|
||||||
|
|
||||||
|
VgData vgData = {0};
|
||||||
|
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &(vgData.vg));
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSubmitReq* subReq = NULL;
|
||||||
|
SSubmitBlk* blk = NULL;
|
||||||
|
void *hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId));
|
||||||
|
if(hData){
|
||||||
|
vgData = *(VgData*)hData;
|
||||||
|
|
||||||
|
int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen;
|
||||||
|
void *tmp = taosMemoryRealloc(vgData.data, totalLen);
|
||||||
|
if (tmp == NULL) {
|
||||||
|
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
vgData.data = tmp;
|
||||||
|
((VgData*)hData)->data = tmp;
|
||||||
|
subReq = (SSubmitReq*)(vgData.data);
|
||||||
|
blk = POINTER_SHIFT(vgData.data, subReq->length);
|
||||||
|
}else{
|
||||||
|
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
|
||||||
|
void *tmp = taosMemoryCalloc(1, totalLen);
|
||||||
|
if (tmp == NULL) {
|
||||||
|
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
vgData.data = tmp;
|
||||||
|
taosHashPut(pVgHash, (const char *)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char *)&vgData, sizeof(vgData));
|
||||||
|
subReq = (SSubmitReq*)(vgData.data);
|
||||||
|
subReq->length = sizeof(SSubmitReq);
|
||||||
|
subReq->numOfBlocks = 0;
|
||||||
|
|
||||||
|
blk = POINTER_SHIFT(vgData.data, sizeof(SSubmitReq));
|
||||||
|
}
|
||||||
|
|
||||||
|
STableMeta** pTableMeta = NULL;
|
||||||
|
code = catalogGetTableMeta(pCatalog, &conn, &pName, pTableMeta);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbName);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
uint64_t suid = (TSDB_NORMAL_TABLE == (*pTableMeta)->tableType ? 0 : (*pTableMeta)->suid);
|
||||||
|
uint64_t uid = (*pTableMeta)->uid;
|
||||||
|
taosMemoryFreeClear(*pTableMeta);
|
||||||
|
|
||||||
|
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
|
||||||
|
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
|
||||||
|
|
||||||
|
SRowBuilder rb = {0};
|
||||||
|
tdSRowInit(&rb, pSW->version);
|
||||||
|
tdSRowSetTpInfo(&rb, pSW->nCols, fLen);
|
||||||
|
int32_t dataLen = 0;
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < rows; j++) {
|
||||||
|
tdSRowResetBuf(&rb, rowData);
|
||||||
|
|
||||||
|
doSetOneRowPtr(&rspObj->resInfo);
|
||||||
|
rspObj->resInfo.current += 1;
|
||||||
|
|
||||||
|
int32_t offset = 0;
|
||||||
|
for (int32_t k = 0; k < pSW->nCols; k++) {
|
||||||
|
const SSchema* pColumn = &pSW->pSchema[k];
|
||||||
|
void *data = rspObj->resInfo.row[k];
|
||||||
|
if (!data) {
|
||||||
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
|
||||||
|
} else {
|
||||||
|
if(IS_VAR_DATA_TYPE(pColumn->type)){
|
||||||
|
data -= VARSTR_HEADER_SIZE;
|
||||||
|
}
|
||||||
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
|
||||||
|
}
|
||||||
|
offset += TYPE_BYTES[pColumn->type];
|
||||||
|
}
|
||||||
|
int32_t rowLen = TD_ROW_LEN(rowData);
|
||||||
|
rowData = POINTER_SHIFT(rowData, rowLen);
|
||||||
|
dataLen += rowLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
blk->uid = htobe64(uid);
|
||||||
|
blk->suid = htobe64(suid);
|
||||||
|
blk->padding = htonl(blk->padding);
|
||||||
|
blk->sversion = htonl(pSW->version);
|
||||||
|
blk->schemaLen = htonl(schemaLen);
|
||||||
|
blk->numOfRows = htons(rows);
|
||||||
|
blk->dataLen = htonl(dataLen);
|
||||||
|
subReq->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
|
||||||
|
subReq->numOfBlocks++;
|
||||||
|
}
|
||||||
|
|
||||||
|
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
|
||||||
|
if (NULL == pQuery) {
|
||||||
|
uError("create SQuery error");
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||||
|
pQuery->haveResultSet = false;
|
||||||
|
pQuery->msgType = TDMT_VND_SUBMIT;
|
||||||
|
pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
||||||
|
if (NULL == pQuery->pRoot) {
|
||||||
|
uError("create pQuery->pRoot error");
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
SVnodeModifOpStmt *nodeStmt = (SVnodeModifOpStmt *)(pQuery->pRoot);
|
||||||
|
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
|
||||||
|
|
||||||
|
int32_t numOfVg = taosHashGetSize(pVgHash);
|
||||||
|
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
|
||||||
|
|
||||||
|
VgData **vData = (VgData **)taosHashIterate(pVgHash, NULL);
|
||||||
|
while (vData) {
|
||||||
|
SVgDataBlocks *dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
||||||
|
if (NULL == dst) {
|
||||||
|
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
dst->vg = (*vData)->vg;
|
||||||
|
SSubmitReq* subReq = (SSubmitReq*)((*vData)->data);
|
||||||
|
dst->numOfTables = subReq->numOfBlocks;
|
||||||
|
dst->size = subReq->length;
|
||||||
|
dst->pData = (char*)subReq;
|
||||||
|
(*vData)->data = NULL; // no need free
|
||||||
|
subReq->header.vgId = htonl(dst->vg.vgId);
|
||||||
|
subReq->version = htonl(1);
|
||||||
|
subReq->header.contLen = htonl(subReq->length);
|
||||||
|
subReq->length = htonl(subReq->length);
|
||||||
|
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
|
||||||
|
taosArrayPush(nodeStmt->pDataBlocks, &dst);
|
||||||
|
vData = (VgData **)taosHashIterate(pVgHash, vData);
|
||||||
|
}
|
||||||
|
|
||||||
|
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||||
|
code = pRequest->code;
|
||||||
|
end:
|
||||||
|
qDestroyQuery(pQuery);
|
||||||
|
destroyRequest(pRequest);
|
||||||
|
taosHashCleanup(pVgHash);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
|
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
|
||||||
|
|
Loading…
Reference in New Issue