check return code

This commit is contained in:
yihaoDeng 2024-09-26 14:01:12 +08:00
parent 7493ca5cf3
commit 9e681a2da1
6 changed files with 54 additions and 108 deletions

View File

@ -384,7 +384,7 @@ static IterateValue* idxCacheIteratorGetValue(Iterate* iter);
IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) { IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
IndexCache* cache = taosMemoryCalloc(1, sizeof(IndexCache)); IndexCache* cache = taosMemoryCalloc(1, sizeof(IndexCache));
if (cache == NULL) { if (cache == NULL) {
indexError("failed to create index cache"); indexError("failed to create index cache since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return NULL; return NULL;
}; };
@ -392,6 +392,11 @@ IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8
cache->mem->pCache = cache; cache->mem->pCache = cache;
cache->colName = cache->colName =
IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? taosStrdup(JSON_COLUMN) : taosStrdup(colName); IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? taosStrdup(JSON_COLUMN) : taosStrdup(colName);
if (cache->colName == NULL) {
taosMemoryFree(cache);
indexError("failed to create index cache since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return NULL;
}
cache->type = type; cache->type = type;
cache->index = idx; cache->index = idx;
cache->version = 0; cache->version = 0;

View File

@ -111,9 +111,6 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes* nodes, FstSlice bs, Output
if (un->last != NULL) return; if (un->last != NULL) return;
// FstLastTransition *trn = taosMemoryMalloc(sizeof(FstLastTransition));
// trn->inp = s->data[s->start];
// trn->out = out;
int32_t len = 0; int32_t len = 0;
uint8_t* data = fstSliceData(s, &len); uint8_t* data = fstSliceData(s, &len);
un->last = fstLastTransitionCreate(data[0], out); un->last = fstLastTransitionCreate(data[0], out);
@ -126,10 +123,11 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes* nodes, FstSlice bs, Output
n->isFinal = false; n->isFinal = false;
n->finalOutput = 0; n->finalOutput = 0;
n->trans = taosArrayInit(16, sizeof(FstTransition)); n->trans = taosArrayInit(16, sizeof(FstTransition));
if (n->trans == NULL) {
taosMemoryFree(n);
return;
}
// FstLastTransition *trn = taosMemoryMalloc(sizeof(FstLastTransition));
// trn->inp = s->data[i];
// trn->out = out;
FstLastTransition* trn = fstLastTransitionCreate(data[i], 0); FstLastTransition* trn = fstLastTransitionCreate(data[i], 0);
FstBuilderNodeUnfinished un = {.node = n, .last = trn}; FstBuilderNodeUnfinished un = {.node = n, .last = trn};

View File

@ -1178,106 +1178,6 @@ _exception:
return code; return code;
} }
#ifdef BUILD_NO_CALL
static int32_t chkpIdComp(const void* a, const void* b) {
int64_t x = *(int64_t*)a;
int64_t y = *(int64_t*)b;
return x < y ? -1 : 1;
}
int32_t streamBackendLoadCheckpointInfo(void* arg) {
SStreamMeta* pMeta = arg;
int32_t code = 0;
SArray* suffix = NULL;
int32_t len = strlen(pMeta->path) + 30;
char* chkpPath = taosMemoryCalloc(1, len);
sprintf(chkpPath, "%s%s%s", pMeta->path, TD_DIRSEP, "checkpoints");
if (!taosDirExist(chkpPath)) {
// no checkpoint, nothing to load
taosMemoryFree(chkpPath);
return 0;
}
TdDirPtr pDir = taosOpenDir(chkpPath);
if (pDir == NULL) {
taosMemoryFree(chkpPath);
return 0;
}
TdDirEntryPtr de = NULL;
suffix = taosArrayInit(4, sizeof(int64_t));
while ((de = taosReadDir(pDir)) != NULL) {
if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
if (taosDirEntryIsDir(de)) {
char checkpointPrefix[32] = {0};
int64_t checkpointId = 0;
int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64 "", &checkpointId);
if (ret == 1) {
taosArrayPush(suffix, &checkpointId);
}
} else {
continue;
}
}
taosArraySort(suffix, chkpIdComp);
// free previous chkpSaved
taosArrayClear(pMeta->chkpSaved);
for (int i = 0; i < taosArrayGetSize(suffix); i++) {
int64_t id = *(int64_t*)taosArrayGet(suffix, i);
taosArrayPush(pMeta->chkpSaved, &id);
}
taosArrayDestroy(suffix);
taosCloseDir(&pDir);
taosMemoryFree(chkpPath);
return 0;
}
#endif
#ifdef BUILD_NO_CALL
int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*** ppHandle, SArray* refs) {
return 0;
// SArray* pHandle = taosArrayInit(16, POINTER_BYTES);
// void* pIter = taosHashIterate(pMeta->pTaskDbUnique, NULL);
// while (pIter) {
// int64_t id = *(int64_t*)pIter;
// SBackendCfWrapper* wrapper = taosAcquireRef(streamBackendCfWrapperId, id);
// if (wrapper == NULL) {
// pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
// continue;
// }
// taosThreadRwlockRdlock(&wrapper->rwLock);
// for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
// if (wrapper->pHandle[i]) {
// rocksdb_column_family_handle_t* p = wrapper->pHandle[i];
// taosArrayPush(pHandle, &p);
// }
// }
// taosThreadRwlockUnlock(&wrapper->rwLock);
// taosArrayPush(refs, &id);
// }
// int32_t nCf = taosArrayGetSize(pHandle);
// rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
// for (int i = 0; i < nCf; i++) {
// ppCf[i] = taosArrayGetP(pHandle, i);
// }
// taosArrayDestroy(pHandle);
// *ppHandle = ppCf;
// return nCf;
}
#endif
int chkpIdComp(const void* a, const void* b) { int chkpIdComp(const void* a, const void* b) {
int64_t x = *(int64_t*)a; int64_t x = *(int64_t*)a;
int64_t y = *(int64_t*)b; int64_t y = *(int64_t*)b;
@ -1288,6 +1188,9 @@ int chkpIdComp(const void* a, const void* b) {
int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) { int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) {
int32_t code = 0; int32_t code = 0;
char* pChkpDir = taosMemoryCalloc(1, 256); char* pChkpDir = taosMemoryCalloc(1, 256);
if (pChkpDir == NULL) {
return terrno;
}
sprintf(pChkpDir, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints"); sprintf(pChkpDir, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints");
if (!taosIsDir(pChkpDir)) { if (!taosIsDir(pChkpDir)) {
@ -2376,8 +2279,14 @@ int32_t taskDbOpenCfs(STaskDbWrapper* pTask, char* path, char** pCfNames, int32_
int32_t code = -1; int32_t code = -1;
char* err = NULL; char* err = NULL;
rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*)); rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
if (cfOpts == NULL) {
return terrno;
}
rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
if (cfHandle == NULL) {
return terrno;
}
for (int i = 0; i < nCf; i++) { for (int i = 0; i < nCf; i++) {
int32_t idx = getCfIdx(pCfNames[i]); int32_t idx = getCfIdx(pCfNames[i]);
@ -2461,6 +2370,14 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
pTaskDb->pCfParams = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam)); pTaskDb->pCfParams = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
pTaskDb->pCfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*)); pTaskDb->pCfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
pTaskDb->pCompares = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*)); pTaskDb->pCompares = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*));
if (pTaskDb->pCf == NULL || pTaskDb->pCfParams == NULL || pTaskDb->pCfOpts == NULL || pTaskDb->pCompares == NULL) {
stError("failed to alloc memory for cf");
taosMemoryFreeClear(pTaskDb->pCf);
taosMemoryFreeClear(pTaskDb->pCfParams);
taosMemoryFreeClear(pTaskDb->pCfOpts);
taosMemoryFreeClear(pTaskDb->pCompares);
return;
}
for (int i = 0; i < nCf; i++) { for (int i = 0; i < nCf; i++) {
rocksdb_options_t* opt = rocksdb_options_create_copy(pTaskDb->dbOpt); rocksdb_options_t* opt = rocksdb_options_create_copy(pTaskDb->dbOpt);

View File

@ -714,6 +714,11 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
plist = taosHashGet(pool, key, klen); plist = taosHashGet(pool, key, klen);
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
if (nList == NULL) {
tError("failed to alloc memory for msg list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return NULL;
}
QUEUE_INIT(&nList->msgQ); QUEUE_INIT(&nList->msgQ);
nList->numOfConn++; nList->numOfConn++;
@ -1319,6 +1324,11 @@ void cliSend(SCliConn* pConn) {
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
if (req == NULL) {
tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
cliHandleExcept(pConn, -1);
}
int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
if (status != 0) { if (status != 0) {
@ -1863,6 +1873,10 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
TAOS_UNUSED(transQueuePush(&conn->cliMsgs, pMsg)); TAOS_UNUSED(transQueuePush(&conn->cliMsgs, pMsg));
conn->dstAddr = taosStrdup(addr); conn->dstAddr = taosStrdup(addr);
if (conn->dstAddr == NULL) {
tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, tstrerror(terrno));
cliHandleExcept(conn, -1);
}
uint32_t ipaddr; uint32_t ipaddr;
int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr); int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr);

View File

@ -421,6 +421,9 @@ void transReqQueueInit(queue* q) {
} }
void* transReqQueuePush(queue* q) { void* transReqQueuePush(queue* q) {
STransReq* req = taosMemoryCalloc(1, sizeof(STransReq)); STransReq* req = taosMemoryCalloc(1, sizeof(STransReq));
if (req == NULL) {
return NULL;
}
req->wreq.data = req; req->wreq.data = req;
QUEUE_PUSH(q, &req->q); QUEUE_PUSH(q, &req->q);
return &req->wreq; return &req->wreq;

View File

@ -707,6 +707,10 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
if (req == NULL) {
tError("failed to send resp since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return;
}
TAOS_UNUSED(uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb)); TAOS_UNUSED(uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb));
} }
static void uvStartSendResp(SSvrMsg* smsg) { static void uvStartSendResp(SSvrMsg* smsg) {
@ -905,6 +909,11 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
#endif #endif
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t)); uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
if (wr == NULL) {
tError("failed to accept since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return;
}
wr->data = cli; wr->data = cli;
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));