fix:modify log

This commit is contained in:
wangmm0220 2023-10-18 18:47:04 +08:00
parent ea4e322427
commit b15b61f9ff
1 changed files with 89 additions and 113 deletions

View File

@ -15,15 +15,14 @@
#include "cJSON.h"
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
#include "tdatablock.h"
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
#include "tqueue.h"
#include "tref.h"
#include "ttimer.h"
#define LOG_ID_TAG "connId:0x%"PRIx64",reqId:0x%"PRIx64
#define LOG_ID_VALUE *(int64_t*)taos,pRequest->requestId
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
@ -32,6 +31,7 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch
char* string = NULL;
cJSON* json = cJSON_CreateObject();
if (json == NULL) {
uError("create json object failed")
return NULL;
}
cJSON* type = cJSON_CreateString("create");
@ -106,6 +106,7 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
json = cJSON_CreateObject();
if (json == NULL) {
uError("create json object failed");
goto end;
}
cJSON* type = cJSON_CreateString("alter");
@ -192,7 +193,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
SDecoder coder;
char* string = NULL;
uDebug("processCreateStb called");
uDebug("create stable data:%p", metaRsp);
// decode and process req
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
@ -202,8 +203,8 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
goto _err;
}
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
uDebug("processCreateStb %s", string);
_err:
uDebug("create stable return, sql json:%s", string);
tDecoderClear(&coder);
return string;
}
@ -212,7 +213,7 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) {
SVCreateStbReq req = {0};
SDecoder coder;
char* string = NULL;
uDebug("processAlterStb called");
uDebug("alter stable data:%p", metaRsp);
// decode and process req
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
@ -223,9 +224,8 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) {
goto _err;
}
string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
uDebug("processAlterStb %s", string);
_err:
uDebug("alter stable return, sql json:%s", string);
tDecoderClear(&coder);
return string;
}
@ -251,12 +251,14 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
SArray* pTagVals = NULL;
int32_t code = tTagToValArray(pTag, &pTagVals);
if (code) {
uError("tTagToValArray failed code:%d", code);
goto end;
}
if (tTagIsJson(pTag)) {
STag* p = (STag*)pTag;
if (p->nTag == 0) {
uError("p->nTag == 0");
goto end;
}
char* pJson = parseTagDatatoJson(pTag);
@ -322,6 +324,7 @@ static char* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) {
char* string = NULL;
cJSON* json = cJSON_CreateObject();
if (json == NULL) {
uError("create json object failed");
return NULL;
}
cJSON* type = cJSON_CreateString("create");
@ -353,7 +356,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
SVCreateTbReq* pCreateReq;
char* string = NULL;
// decode
uDebug("processCreateTable called");
uDebug("create table data:%p", metaRsp);
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&decoder, data, len);
@ -370,10 +373,10 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
string =
buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
}
uDebug("processCreateTable :%s", string);
}
_exit:
uDebug("create table return, sql json:%s", string);
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment);
@ -387,9 +390,9 @@ _exit:
}
static char* processAutoCreateTable(STaosxRsp* rsp) {
uDebug("processAutoCreateTable called");
uDebug("auto create table data:%p", rsp);
if (rsp->createTableNum <= 0) {
uError("WriteRaw:processAutoCreateTable rsp->createTableNum <= 0");
uError("processAutoCreateTable rsp->createTableNum <= 0");
goto _exit;
}
@ -408,13 +411,13 @@ static char* processAutoCreateTable(STaosxRsp* rsp) {
}
if (pCreateReq[iReq].type != TSDB_CHILD_TABLE) {
uError("WriteRaw:processAutoCreateTable pCreateReq[iReq].type != TSDB_CHILD_TABLE");
uError("processAutoCreateTable pCreateReq[iReq].type != TSDB_CHILD_TABLE");
goto _exit;
}
}
string = buildCreateCTableJson(pCreateReq, rsp->createTableNum);
uDebug("processAutoCreateTable :%s", string);
_exit:
uDebug("auto created table return, sql json:%s", string);
for (int i = 0; i < rsp->createTableNum; i++) {
tDecoderClear(&decoder[i]);
taosMemoryFreeClear(pCreateReq[i].comment);
@ -433,17 +436,19 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
char* string = NULL;
cJSON* json = NULL;
uDebug("processAlterTable called");
uDebug("alter table data:%p", metaRsp);
// decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&decoder, data, len);
if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
uError("tDecodeSVAlterTbReq error");
goto _exit;
}
json = cJSON_CreateObject();
if (json == NULL) {
uError("create json object failed");
goto _exit;
}
cJSON* type = cJSON_CreateString("alter");
@ -543,9 +548,9 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
break;
}
string = cJSON_PrintUnformatted(json);
uDebug("processAlterTable :%s", string);
_exit:
uDebug("alter table return, sql json:%s", string);
cJSON_Delete(json);
tDecoderClear(&decoder);
return string;
@ -556,18 +561,20 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
SVDropStbReq req = {0};
char* string = NULL;
cJSON* json = NULL;
uDebug("processDropSTable called");
uDebug("processDropSTable data:%p", metaRsp);
// decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&decoder, data, len);
if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
uError("tDecodeSVDropStbReq failed");
goto _exit;
}
json = cJSON_CreateObject();
if (json == NULL) {
uError("create json object failed");
goto _exit;
}
cJSON* type = cJSON_CreateString("drop");
@ -578,8 +585,8 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
cJSON_AddItemToObject(json, "tableName", tableName);
string = cJSON_PrintUnformatted(json);
uDebug("processDropSTable :%s", string);
_exit:
uDebug("processDropSTable return, sql json:%s", string);
cJSON_Delete(json);
tDecoderClear(&decoder);
return string;
@ -587,18 +594,17 @@ _exit:
static char* processDeleteTable(SMqMetaRsp* metaRsp) {
SDeleteRes req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
cJSON* json = NULL;
char* string = NULL;
uDebug("processDeleteTable called");
uDebug("processDeleteTable data:%p", metaRsp);
// decode and process req
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeDeleteRes(&coder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
uError("tDecodeDeleteRes failed");
goto _exit;
}
@ -606,10 +612,10 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp) {
char sql[256] = {0};
snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
req.tsColName, req.skey, req.tsColName, req.ekey);
uDebug("delete sql:%s\n", sql);
json = cJSON_CreateObject();
if (json == NULL) {
uError("creaet json object failed");
goto _exit;
}
cJSON* type = cJSON_CreateString("delete");
@ -618,8 +624,8 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp) {
cJSON_AddItemToObject(json, "sql", sqlJson);
string = cJSON_PrintUnformatted(json);
uDebug("processDeleteTable :%s", string);
_exit:
uDebug("processDeleteTable return, sql json:%s", string);
cJSON_Delete(json);
tDecoderClear(&coder);
return string;
@ -631,17 +637,19 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
char* string = NULL;
cJSON* json = NULL;
uDebug("processDropTable called");
uDebug("processDropTable data:%p", metaRsp);
// decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&decoder, data, len);
if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
uError("tDecodeSVDropTbBatchReq failed");
goto _exit;
}
json = cJSON_CreateObject();
if (json == NULL) {
uError("create json object failed");
goto _exit;
}
cJSON* type = cJSON_CreateString("drop");
@ -661,26 +669,26 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
cJSON_AddItemToObject(json, "tableNameList", tableNameList);
string = cJSON_PrintUnformatted(json);
uDebug("processDropTable :%s", string);
_exit:
uDebug("processDropTable return, json sql:%s", string);
cJSON_Delete(json);
tDecoderClear(&decoder);
return string;
}
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
if(taos == NULL || meta == NULL) return TSDB_CODE_INVALID_PARA;
SVCreateStbReq req = {0};
SDecoder coder;
SMCreateStbReq pReq = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL;
uDebug("taosCreateStb called");
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) {
goto end;
return code;
}
uDebug(LOG_ID_TAG" create stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
pRequest->syncQuery = true;
if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
@ -719,7 +727,8 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
pReq.source = TD_REQ_FROM_TAOX;
pReq.igExists = true;
uDebug("taosCreateStb name:%s suid:%" PRId64 " processSuid:%" PRId64, req.name, req.suid, pReq.suid);
uDebug(LOG_ID_TAG" create stable name:%s suid:%" PRId64 " processSuid:%" PRId64,
LOG_ID_VALUE, req.name, req.suid, pReq.suid);
STscObj* pTscObj = pRequest->pTscObj;
SName tableName;
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
@ -753,6 +762,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
taosMemoryFree(pCmdMsg.pMsg);
end:
uDebug(LOG_ID_TAG" create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
destroyRequest(pRequest);
tFreeSMCreateStbReq(&pReq);
tDecoderClear(&coder);
@ -760,18 +770,19 @@ end:
}
static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
if(taos == NULL || meta == NULL) return TSDB_CODE_INVALID_PARA;
SVDropStbReq req = {0};
SDecoder coder = {0};
SMDropStbReq pReq = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL;
uDebug("taosDropStb called");
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) {
goto end;
return code;
}
uDebug(LOG_ID_TAG" drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
pRequest->syncQuery = true;
if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
@ -805,7 +816,6 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
goto end;
}
if (code != TSDB_CODE_SUCCESS) {
uError("taosCreateTable:catalogGetTableMeta failed. table name: %s", req.name);
goto end;
}
pReq.suid = pTableMeta->uid;
@ -816,7 +826,8 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
pReq.source = TD_REQ_FROM_TAOX;
// pReq.suid = processSuid(req.suid, pRequest->pDb);
uDebug("taosDropStb name:%s suid:%" PRId64 " new suid:%" PRId64, req.name, req.suid, pReq.suid);
uDebug(LOG_ID_TAG" drop stable name:%s suid:%" PRId64 " new suid:%" PRId64,
LOG_ID_VALUE, req.name, req.suid, pReq.suid);
STscObj* pTscObj = pRequest->pTscObj;
SName tableName = {0};
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
@ -850,6 +861,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
taosMemoryFree(pCmdMsg.pMsg);
end:
uDebug(LOG_ID_TAG" drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
destroyRequest(pRequest);
tDecoderClear(&coder);
return code;
@ -867,6 +879,7 @@ static void destroyCreateTbReqBatch(void* data) {
}
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
if(taos == NULL || meta == NULL) return TSDB_CODE_INVALID_PARA;
SVCreateTbBatchReq req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
@ -874,12 +887,13 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
SQuery* pQuery = NULL;
SHashObj* pVgroupHashmap = NULL;
uDebug("taosCreateTable called");
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) {
goto end;
return code;
}
uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
pRequest->syncQuery = true;
if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
@ -939,12 +953,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
if (code != TSDB_CODE_SUCCESS) {
uError("taosCreateTable:catalogGetTableMeta failed. table name: %s", pCreateReq->ctb.stbName);
goto end;
}
pCreateReq->ctb.suid = pTableMeta->uid;
uDebug("taosCreateTable name:%s sname:%s suid:%" PRId64 " new suid:%" PRId64, pCreateReq->name,
pCreateReq->ctb.stbName, oldSuid, pCreateReq->ctb.suid);
for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
@ -999,6 +1010,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code;
end:
uDebug(LOG_ID_TAG" create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment);
@ -1026,6 +1038,7 @@ static void destroyDropTbReqBatch(void* data) {
}
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
if(taos == NULL || meta == NULL) return TSDB_CODE_INVALID_PARA;
SVDropTbBatchReq req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
@ -1033,11 +1046,12 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
SQuery* pQuery = NULL;
SHashObj* pVgroupHashmap = NULL;
uDebug("taosDropTable called");
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) {
goto end;
return code;
}
uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
pRequest->syncQuery = true;
if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
@ -1095,13 +1109,12 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
continue;
}
if (code != TSDB_CODE_SUCCESS) {
uError("taosDropTable:catalogGetTableMeta failed. table name: %s", pDropReq->name);
goto end;
}
tb_uid_t oldSuid = pDropReq->suid;
pDropReq->suid = pTableMeta->suid;
taosMemoryFreeClear(pTableMeta);
uDebug("taosDropTable name:%s suid:%" PRId64 " new suid:%" PRId64, pDropReq->name, oldSuid, pDropReq->suid);
uDebug(LOG_ID_TAG" drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid, pDropReq->suid);
taosArrayPush(pRequest->tableList, &pName);
SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
@ -1144,6 +1157,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code;
end:
uDebug(LOG_ID_TAG" drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
taosHashCleanup(pVgroupHashmap);
destroyRequest(pRequest);
tDecoderClear(&coder);
@ -1184,11 +1198,14 @@ end:
//}
static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
SDeleteRes req = {0};
SDecoder coder = {0};
if(taos == NULL || meta == NULL) return TSDB_CODE_INVALID_PARA;
SDeleteRes req = {0};
SDecoder coder = {0};
char sql[256] = {0};
int32_t code = TSDB_CODE_SUCCESS;
uDebug("taosDeleteData called");
uDebug("connId:0x%"PRIx64" delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
@ -1199,10 +1216,8 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
}
// getTbName(req.tableFName);
char sql[256] = {0};
snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
req.tsColName, req.skey, req.tsColName, req.ekey);
uDebug("delete sql:%s\n", sql);
TAOS_RES* res = taos_query(taos, sql);
SRequestObj* pRequest = (SRequestObj*)res;
@ -1213,11 +1228,13 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
taos_free_result(res);
end:
uDebug("connId:0x%"PRIx64" delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
tDecoderClear(&coder);
return code;
}
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
if(taos == NULL || meta == NULL) return TSDB_CODE_INVALID_PARA;
SVAlterTbReq req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
@ -1226,13 +1243,11 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
SArray* pArray = NULL;
SVgDataBlocks* pVgData = NULL;
uDebug("taosAlterTable called");
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) {
goto end;
return code;
}
uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
pRequest->syncQuery = true;
if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
@ -1272,7 +1287,6 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
goto end;
}
uDebug("taosAlterTable name:%s", req.tbName);
pArray = taosArrayInit(1, sizeof(void*));
if (NULL == pArray) {
code = TSDB_CODE_OUT_OF_MEMORY;
@ -1327,6 +1341,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
}
}
end:
uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code));
taosArrayDestroy(pArray);
if (pVgData) taosMemoryFreeClear(pVgData->pData);
taosMemoryFreeClear(pVgData);
@ -1345,18 +1360,17 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
STableMeta* pTableMeta = NULL;
SQuery* pQuery = NULL;
SHashObj* pVgHash = NULL;
uDebug("taos_write_raw_block_with_fields called");
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
if (!pRequest) {
uError("taos_write_raw_block_with_fields:createRequest error request is null");
code = terrno;
goto end;
return terrno;
}
uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d",
LOG_ID_VALUE, rows, pData, tbname, fields, numFields);
pRequest->syncQuery = true;
if (!pRequest->pDb) {
uError("taos_write_raw_block_with_fields:not use db");
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
@ -1365,11 +1379,9 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
tstrncpy(pName.tname, tbname, sizeof(pName.tname));
uDebug("taos_write_raw_block_with_fields name:%s", tbname);
struct SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
uError("taos_write_raw_block_with_fields: get gatlog error");
goto end;
}
@ -1382,13 +1394,11 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
SVgroupInfo vgData = {0};
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData);
if (code != TSDB_CODE_SUCCESS) {
uError("taos_write_raw_block_with_fields:catalogGetTableHashVgroup failed. table name: %s", tbname);
goto end;
}
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
if (code != TSDB_CODE_SUCCESS) {
uError("taos_write_raw_block_with_fields:catalogGetTableMeta failed. table name: %s", tbname);
goto end;
}
// uError("td23101 0vgId:%d, vgId:%d, name:%s, uid:%"PRIu64, vgData.vgId, pTableMeta->vgId, tbname, pTableMeta->uid);
@ -1400,18 +1410,14 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
}
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData));
// uError("td23101 1vgId:%d, numEps:%d, name:%s, uid:%"PRIu64, vgData.vgId, vgData.epSet.numOfEps, tbname,
// pTableMeta->uid);
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false);
if (code != TSDB_CODE_SUCCESS) {
uError("taos_write_raw_block_with_fields:rawBlockBindData failed");
goto end;
}
code = smlBuildOutput(pQuery, pVgHash);
if (code != TSDB_CODE_SUCCESS) {
uError("taos_write_raw_block_with_fields:smlBuildOutput failed");
goto end;
}
@ -1419,6 +1425,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
code = pRequest->code;
end:
uDebug(LOG_ID_TAG " write raw block with field return, msg:%s", LOG_ID_VALUE, tstrerror(code));
taosMemoryFreeClear(pTableMeta);
qDestroyQuery(pQuery);
destroyRequest(pRequest);
@ -1435,17 +1442,16 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
SQuery* pQuery = NULL;
SHashObj* pVgHash = NULL;
uDebug("taos_write_raw_block called");
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
if (!pRequest) {
uError("taos_write_raw_block:createRequest error request is null");
code = terrno;
goto end;
return code;
}
uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname);
pRequest->syncQuery = true;
if (!pRequest->pDb) {
uError("taos_write_raw_block:not use db");
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
@ -1454,11 +1460,9 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
tstrncpy(pName.tname, tbname, sizeof(pName.tname));
uDebug("taos_write_raw_block name:%s", tbname);
struct SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
uError("taos_write_raw_block: get gatlog error");
goto end;
}
@ -1471,13 +1475,11 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
SVgroupInfo vgData = {0};
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData);
if (code != TSDB_CODE_SUCCESS) {
uError("taos_write_raw_block:catalogGetTableHashVgroup failed. table name: %s", tbname);
goto end;
}
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
if (code != TSDB_CODE_SUCCESS) {
uError("taos_write_raw_block:catalogGetTableMeta failed. table name: %s", tbname);
goto end;
}
pQuery = smlInitHandle();
@ -1490,13 +1492,11 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false);
if (code != TSDB_CODE_SUCCESS) {
uError("taos_write_raw_block:rawBlockBindData failed");
goto end;
}
code = smlBuildOutput(pQuery, pVgHash);
if (code != TSDB_CODE_SUCCESS) {
uError("taos_write_raw_block:smlBuildOutput failed");
goto end;
}
@ -1504,6 +1504,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
code = pRequest->code;
end:
uDebug(LOG_ID_TAG " write raw block return, msg:%s", LOG_ID_VALUE, tstrerror(code));
taosMemoryFreeClear(pTableMeta);
qDestroyQuery(pQuery);
destroyRequest(pRequest);
@ -1518,15 +1519,14 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
SMqRspObj rspObj = {0};
SDecoder decoder = {0};
STableMeta* pTableMeta = NULL;
uDebug("writeraw data called");
terrno = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
if (!pRequest) {
uError("writeraw data:createRequest error request is null");
return terrno;
}
uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
pRequest->syncQuery = true;
rspObj.resIter = -1;
rspObj.resType = RES_TYPE__TMQ;
@ -1534,13 +1534,11 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
tDecoderInit(&decoder, data, dataLen);
code = tDecodeMqDataRsp(&decoder, &rspObj.rsp);
if (code != 0) {
uError("writeraw data:decode smqDataRsp error");
code = TSDB_CODE_INVALID_MSG;
goto end;
}
if (!pRequest->pDb) {
uError("writeraw data:not use db");
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
@ -1548,7 +1546,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
struct SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
uError("writeraw data: get gatlog error");
goto end;
}
@ -1564,22 +1561,18 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
goto end;
}
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
uDebug("writeraw data raw data block num:%d", rspObj.rsp.blockNum);
while (++rspObj.resIter < rspObj.rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
if (!rspObj.rsp.withSchema) {
uError("writeraw data:no schema, iter:%d", rspObj.resIter);
goto end;
}
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
if (!tbName) {
uError("writeraw data: tbname is null");
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
}
uDebug("writeraw data raw data tbname:%s", tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
strcpy(pName.dbname, pRequest->pDb);
strcpy(pName.tname, tbName);
@ -1591,14 +1584,12 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
// continue;
// }
if (code != TSDB_CODE_SUCCESS) {
uError("writeraw data:catalogGetTableMeta failed. table name: %s", tbName);
goto end;
}
SVgroupInfo vg;
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg);
if (code != TSDB_CODE_SUCCESS) {
uError("writeraw data:catalogGetTableHashVgroup failed. table name: %s", tbName);
goto end;
}
@ -1620,7 +1611,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, fields, pSW->nCols, true);
taosMemoryFree(fields);
if (code != TSDB_CODE_SUCCESS) {
uError("writeraw data:rawBlockBindData failed");
goto end;
}
taosMemoryFreeClear(pTableMeta);
@ -1628,7 +1618,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
code = smlBuildOutput(pQuery, pVgHash);
if (code != TSDB_CODE_SUCCESS) {
uError("writeraw data:smlBuildOutput failed");
goto end;
}
@ -1636,6 +1625,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
code = pRequest->code;
end:
uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
tDeleteMqDataRsp(&rspObj.rsp);
tDecoderClear(&decoder);
qDestroyQuery(pQuery);
@ -1653,15 +1643,13 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
SDecoder decoder = {0};
STableMeta* pTableMeta = NULL;
SVCreateTbReq* pCreateReqDst = NULL;
uDebug("writeraw meta data called");
terrno = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
if (!pRequest) {
uError("writeraw meta data:createRequest error request is null");
return terrno;
}
uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
pRequest->syncQuery = true;
rspObj.resIter = -1;
rspObj.resType = RES_TYPE__TMQ_METADATA;
@ -1669,13 +1657,11 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tDecoderInit(&decoder, data, dataLen);
code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp);
if (code != 0) {
uError("writeraw meta data:decode smqDataRsp error");
code = TSDB_CODE_INVALID_MSG;
goto end;
}
if (!pRequest->pDb) {
uError("writeraw meta data:not use db");
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
@ -1683,7 +1669,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
struct SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
uError("writeraw meta data: get gatlog error");
goto end;
}
@ -1700,22 +1685,20 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
}
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
uDebug("writeraw meta data raw data block num:%d", rspObj.rsp.blockNum);
uDebug(LOG_ID_TAG" write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.blockNum);
while (++rspObj.resIter < rspObj.rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
if (!rspObj.rsp.withSchema) {
uError("writeraw meta data:no schema, iter:%d", rspObj.resIter);
goto end;
}
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
if (!tbName) {
uError("writeraw meta data: tbname is null");
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
}
uDebug("writeraw meta data raw data tbname:%s\n", tbName);
uDebug(LOG_ID_TAG" write raw metadata block tbname:%s", LOG_ID_VALUE, tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
strcpy(pName.dbname, pRequest->pDb);
strcpy(pName.tname, tbName);
@ -1731,13 +1714,11 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) {
tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
uError("writeraw meta data: tDecodeSVCreateTbReq error");
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
}
if (pCreateReq.type != TSDB_CHILD_TABLE) {
uError("writeraw meta data:pCreateReq.type != TSDB_CHILD_TABLE. table name: %s", tbName);
code = TSDB_CODE_TSC_INVALID_VALUE;
tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
@ -1757,7 +1738,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
SVgroupInfo vg;
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg);
if (code != TSDB_CODE_SUCCESS) {
uError("writeraw meta data:catalogGetTableHashVgroup failed. table name: %s", tbName);
goto end;
}
@ -1771,7 +1751,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
// continue;
// }
if (code != TSDB_CODE_SUCCESS) {
uError("writeraw meta data:catalogGetTableMeta failed. table name: %s", tbName);
goto end;
}
@ -1798,7 +1777,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, fields, pSW->nCols, true);
taosMemoryFree(fields);
if (code != TSDB_CODE_SUCCESS) {
uError("writeraw meta data:rawBlockBindData failed");
goto end;
}
pCreateReqDst = NULL;
@ -1807,7 +1785,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
code = smlBuildOutput(pQuery, pVgHash);
if (code != TSDB_CODE_SUCCESS) {
uError("writeraw meta data:smlBuildOutput failed");
goto end;
}
@ -1815,6 +1792,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
code = pRequest->code;
end:
uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
tDeleteSTaosxRsp(&rspObj.rsp);
tDecoderClear(&decoder);
qDestroyQuery(pQuery);
@ -1830,7 +1808,7 @@ end:
char* tmq_get_json_meta(TAOS_RES* res) {
if (res == NULL) return NULL;
uDebug("tmq_get_json_meta called");
uDebug("tmq_get_json_meta res:%p", res);
if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res)) {
return NULL;
}
@ -1865,7 +1843,6 @@ char* tmq_get_json_meta(TAOS_RES* res) {
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
uDebug("tmq_get_raw called");
if (!raw || !res) {
return TSDB_CODE_INVALID_PARA;
}
@ -1874,7 +1851,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
raw->raw = pMetaRspObj->metaRsp.metaRsp;
raw->raw_len = pMetaRspObj->metaRsp.metaRspLen;
raw->raw_type = pMetaRspObj->metaRsp.resMsgType;
uDebug("tmq_get_raw meta");
uDebug("tmq get raw type meta:%p", raw);
} else if (TD_RES_TMQ(res)) {
SMqRspObj* rspObj = ((SMqRspObj*)res);
@ -1894,7 +1871,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
raw->raw = buf;
raw->raw_len = len;
raw->raw_type = RES_TYPE__TMQ;
uDebug("tmq_get_raw data");
uDebug("tmq get raw type data:%p", raw);
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* rspObj = ((SMqTaosxRspObj*)res);
@ -1914,23 +1891,22 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
raw->raw = buf;
raw->raw_len = len;
raw->raw_type = RES_TYPE__TMQ_METADATA;
uDebug("tmq_get_raw meta data");
uDebug("tmq get raw type metadata:%p", raw);
} else {
uError("tmq_get_raw error:%d", *(int8_t*)res);
uError("tmq get raw error type:%d", *(int8_t*)res);
return TSDB_CODE_TMQ_INVALID_MSG;
}
return TSDB_CODE_SUCCESS;
}
void tmq_free_raw(tmq_raw_data raw) {
uDebug("tmq_free_raw raw_type:%d", raw.raw_type);
uDebug("tmq free raw data type:%d", raw.raw_type);
if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
taosMemoryFree(raw.raw);
}
}
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
uDebug("tmq_write_raw called");
if (!taos) {
return TSDB_CODE_INVALID_PARA;
}