opti:add local thread error msg array

This commit is contained in:
wangmm0220 2024-04-18 13:56:43 +08:00
parent a34b644640
commit 529ca1d1c1
7 changed files with 79 additions and 23 deletions

View File

@ -152,7 +152,8 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS
STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl,
char* msgBuf, int32_t msgBufLen);
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD *fields, int numFields, bool needChangeLength);
int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD *fields,
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen);
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap);

View File

@ -33,8 +33,15 @@ extern "C" {
const char* tstrerror(int32_t err);
const char* terrstr();
#define ERR_MSG_LEN 256
char* taosGetErrMsg();
int32_t* taosGetErrno();
#define terrno (*taosGetErrno())
#define terrMsg (taosGetErrMsg())
#define SET_ERROR_MSG(MSG, ...) \
snprintf(terrMsg, ERR_MSG_LEN, MSG, ##__VA_ARGS__)
#define TSDB_CODE_SUCCESS 0
#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error

View File

@ -1486,7 +1486,7 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pDat
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData));
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false);
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
@ -1570,7 +1570,7 @@ int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const cha
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData));
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false);
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
@ -1608,6 +1608,7 @@ static void* getRawDataFromRes(void* pRetrieve) {
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
if (taos == NULL || data == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data);
return terrno;
}
int32_t code = TSDB_CODE_SUCCESS;
@ -1620,6 +1621,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
terrno = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
if (!pRequest) {
SET_ERROR_MSG("pRequest is NULL");
return terrno;
}
@ -1631,6 +1633,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
tDecoderInit(&decoder, data, dataLen);
code = tDecodeMqDataRsp(&decoder, &rspObj.rsp, *(int8_t*)data);
if (code != 0) {
SET_ERROR_MSG("decode mq data rsp failed");
code = TSDB_CODE_INVALID_MSG;
goto end;
}
@ -1643,6 +1646,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
struct SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("cata log get handle failed");
goto end;
}
@ -1655,6 +1659,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
pQuery = smlInitHandle();
if (pQuery == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
SET_ERROR_MSG("init sml handle failed");
goto end;
}
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
@ -1666,6 +1671,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
if (!tbName) {
SET_ERROR_MSG("block tbname is null");
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
}
@ -1681,12 +1687,14 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
// continue;
// }
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("cata log get table:%s meta failed", tbName);
goto end;
}
SVgroupInfo vg;
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("cata log get table:%s vgroup failed", tbName);
goto end;
}
@ -1698,6 +1706,8 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
if (fields == NULL) {
SET_ERROR_MSG("calloc fields failed");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
for (int i = 0; i < pSW->nCols; i++) {
@ -1706,16 +1716,18 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
}
void* rawData = getRawDataFromRes(pRetrieve);
code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, fields, pSW->nCols, true);
char err[ERR_MSG_LEN] = {0};
code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, fields, pSW->nCols, true, err, ERR_MSG_LEN);
taosMemoryFree(fields);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("table:%s, err:%s", tbName, err);
goto end;
}
taosMemoryFreeClear(pTableMeta);
}
code = smlBuildOutput(pQuery, pVgHash);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("sml build output failed");
goto end;
}
@ -1737,6 +1749,7 @@ end:
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
if (taos == NULL || data == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data);
return terrno;
}
int32_t code = TSDB_CODE_SUCCESS;
@ -1750,6 +1763,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
terrno = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
if (!pRequest) {
SET_ERROR_MSG("pRequest is NULL");
return terrno;
}
uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
@ -1760,6 +1774,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tDecoderInit(&decoder, data, dataLen);
code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp, *(int8_t*)data);
if (code != 0) {
SET_ERROR_MSG("decode mq taosx data rsp failed");
code = TSDB_CODE_INVALID_MSG;
goto end;
}
@ -1772,6 +1787,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
struct SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("cata log get handle failed");
goto end;
}
@ -1783,6 +1799,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
pQuery = smlInitHandle();
if (pQuery == NULL) {
SET_ERROR_MSG("init sml handle failed");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
@ -1797,6 +1814,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
if (!tbName) {
SET_ERROR_MSG("block tbname is null");
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
}
@ -1818,6 +1836,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
code = TSDB_CODE_TMQ_INVALID_MSG;
SET_ERROR_MSG("decode create table:%s req failed", tbName);
goto end;
}
@ -1825,6 +1844,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
code = TSDB_CODE_TSC_INVALID_VALUE;
tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
SET_ERROR_MSG("create table req type is not child table: %s, type: %d", tbName, pCreateReq.type);
goto end;
}
if (strcmp(tbName, pCreateReq.name) == 0) {
@ -1841,6 +1861,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
SVgroupInfo vg;
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("cata log get table:%s vgroup failed", tbName);
goto end;
}
@ -1854,6 +1875,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
// continue;
// }
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("cata log get table:%s meta failed", tbName);
goto end;
}
@ -1870,6 +1892,8 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
if (fields == NULL) {
SET_ERROR_MSG("calloc fields failed");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
for (int i = 0; i < pSW->nCols; i++) {
@ -1878,12 +1902,13 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
}
void* rawData = getRawDataFromRes(pRetrieve);
code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true);
char err[ERR_MSG_LEN] = {0};
code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true, err, ERR_MSG_LEN);
taosMemoryFree(fields);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("table:%s, err:%s", tbName, err);
goto end;
}
taosMemoryFreeClear(pTableMeta);
}
code = smlBuildOutput(pQuery, pVgHash);
@ -2010,6 +2035,7 @@ void tmq_free_raw(tmq_raw_data raw) {
if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
taosMemoryFree(raw.raw);
}
memset(terrMsg, 0, ERR_MSG_LEN);
}
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {

View File

@ -1056,17 +1056,17 @@ static void tmqMgmtInit(void) {
}
}
#define SET_ERROR_MSG(MSG) \
#define SET_ERROR_MSG_TMQ(MSG) \
if (errstr != NULL) snprintf(errstr, errstrLen, MSG);
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (conf == NULL) {
SET_ERROR_MSG("configure is null")
SET_ERROR_MSG_TMQ("configure is null")
return NULL;
}
taosThreadOnce(&tmqInit, tmqMgmtInit);
if (tmqInitRes != 0) {
terrno = tmqInitRes;
SET_ERROR_MSG("tmq timer init error")
SET_ERROR_MSG_TMQ("tmq timer init error")
return NULL;
}
@ -1074,7 +1074,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (pTmq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tscError("failed to create consumer, groupId:%s, code:%s", conf->groupId, terrstr());
SET_ERROR_MSG("malloc tmq failed")
SET_ERROR_MSG_TMQ("malloc tmq failed")
return NULL;
}
@ -1090,7 +1090,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
conf->groupId[0] == 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
SET_ERROR_MSG("malloc tmq element failed or group is empty")
SET_ERROR_MSG_TMQ("malloc tmq element failed or group is empty")
goto _failed;
}
@ -1123,7 +1123,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
pTmq->groupId);
SET_ERROR_MSG("init t_sem failed")
SET_ERROR_MSG_TMQ("init t_sem failed")
goto _failed;
}
@ -1132,13 +1132,13 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (pTmq->pTscObj == NULL) {
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
tsem_destroy(&pTmq->rspSem);
SET_ERROR_MSG("init tscObj failed")
SET_ERROR_MSG_TMQ("init tscObj failed")
goto _failed;
}
pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
if (pTmq->refId < 0) {
SET_ERROR_MSG("add tscObj ref failed")
SET_ERROR_MSG_TMQ("add tscObj ref failed")
goto _failed;
}

View File

@ -641,7 +641,7 @@ static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
}
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD* tFields,
int numFields, bool needChangeLength) {
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen) {
void* tmp =
taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
STableDataCxt* pTableCxt = NULL;
@ -662,8 +662,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
}
char* p = (char*)data;
// | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
// column length |
// | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each column length |
int32_t version = *(int32_t*)data;
p += sizeof(int32_t);
p += sizeof(int32_t);
@ -689,12 +688,12 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
if (tFields != NULL && numFields != numOfCols) {
uError("numFields:%d != numOfCols:%d", numFields, numOfCols);
if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d != raw numOfCols:%d", numFields, numOfCols);
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
if (tFields != NULL && numFields > boundInfo->numOfBound) {
uError("numFields:%d > boundInfo->numOfBound:%d", numFields, boundInfo->numOfBound);
if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d > boundInfo->numOfBound:%d", numFields, boundInfo->numOfBound);
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
@ -703,7 +702,8 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
SSchema* pColSchema = &pSchema[j];
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
uError("type or bytes not equal");
if (errstr != NULL) snprintf(errstr, errstrLen, "type or bytes not equal, id:%d, type:%d, raw type:%d. bytes:%d, raw bytes:%d",
pColSchema->colId, pColSchema->type, *fields, pColSchema->bytes, *(int32_t*)(fields + sizeof(int8_t)));
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
@ -732,7 +732,8 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
SSchema* pColSchema = &pSchema[j];
if (strcmp(pColSchema->name, tFields[i].name) == 0) {
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
uError("type or bytes not equal");
if (errstr != NULL) snprintf(errstr, errstrLen, "type or bytes not equal, id:%d, type:%d, raw type:%d. bytes:%d, raw bytes:%d",
pColSchema->colId, pColSchema->type, *fields, pColSchema->bytes, *(int32_t*)(fields + sizeof(int8_t)));
ret = TSDB_CODE_INVALID_PARA;
goto end;
}

View File

@ -27,8 +27,11 @@ typedef struct {
} STaosError;
static threadlocal int32_t tsErrno;
static threadlocal char tsErrMsgDetail[ERR_MSG_LEN] = {0};
static threadlocal char tsErrMsgReturn[ERR_MSG_LEN] = {0};
int32_t* taosGetErrno() { return &tsErrno; }
char* taosGetErrMsg() { return tsErrMsgDetail; }
#ifdef TAOS_ERROR_C
#define TAOS_DEFINE_ERROR(name, msg) {.val = (name), .str = (msg)},
@ -793,7 +796,13 @@ const char* tstrerror(int32_t err) {
} else if (err < val) {
e = mid;
} else if (err == val) {
return errors[mid].str;
if(strlen(tsErrMsgDetail) == 0){
return errors[mid].str;
} else{
memset(tsErrMsgReturn, 0, ERR_MSG_LEN);
snprintf(tsErrMsgReturn, ERR_MSG_LEN, "%s,detail:%s", errors[mid].str, tsErrMsgDetail);
return (const char*)tsErrMsgReturn;
}
} else {
break;
}

View File

@ -1000,6 +1000,17 @@ void testConsumeExcluded(int topic_type){
}
taos_free_result(pRes);
}
void testDetailError(){
tmq_raw_data raw = {0};
raw.raw_type = 2;
int32_t code = tmq_write_raw((TAOS *)1, raw);
ASSERT(code);
const char *err = taos_errstr(NULL);
code = strcmp("Invalid parameters,detail:taos:0x1 or data:0x0 is NULL", err);
ASSERT(code == 0);
}
int main(int argc, char* argv[]) {
for (int32_t i = 1; i < argc; i++) {
if (strcmp(argv[i], "-c") == 0) {
@ -1036,5 +1047,6 @@ int main(int argc, char* argv[]) {
testConsumeExcluded(1);
testConsumeExcluded(2);
testDetailError();
taosCloseFile(&g_fp);
}