Merge pull request #28126 from taosdata/fix/checkReturnCode
check return code
This commit is contained in:
commit
0ebbd730ed
|
@ -315,6 +315,10 @@ SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
|
||||||
}
|
}
|
||||||
mtq->opera = opera;
|
mtq->opera = opera;
|
||||||
mtq->query = taosArrayInit(4, sizeof(SIndexTermQuery));
|
mtq->query = taosArrayInit(4, sizeof(SIndexTermQuery));
|
||||||
|
if (mtq->query == NULL) {
|
||||||
|
taosMemoryFree(mtq);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
return mtq;
|
return mtq;
|
||||||
}
|
}
|
||||||
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
|
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
|
||||||
|
@ -359,10 +363,22 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy
|
||||||
len = idxConvertDataToStr((void*)colVal, IDX_TYPE_GET_TYPE(colType), (void**)&buf);
|
len = idxConvertDataToStr((void*)colVal, IDX_TYPE_GET_TYPE(colType), (void**)&buf);
|
||||||
} else if (colVal == NULL) {
|
} else if (colVal == NULL) {
|
||||||
buf = strndup(INDEX_DATA_NULL_STR, (int32_t)strlen(INDEX_DATA_NULL_STR));
|
buf = strndup(INDEX_DATA_NULL_STR, (int32_t)strlen(INDEX_DATA_NULL_STR));
|
||||||
|
if (buf == NULL) {
|
||||||
|
taosMemoryFree(tm->colName);
|
||||||
|
taosMemoryFree(tm);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
len = (int32_t)strlen(INDEX_DATA_NULL_STR);
|
len = (int32_t)strlen(INDEX_DATA_NULL_STR);
|
||||||
} else {
|
} else {
|
||||||
static const char* emptyStr = " ";
|
static const char* emptyStr = " ";
|
||||||
buf = strndup(emptyStr, (int32_t)strlen(emptyStr));
|
buf = strndup(emptyStr, (int32_t)strlen(emptyStr));
|
||||||
|
if (buf == NULL) {
|
||||||
|
taosMemoryFree(tm->colName);
|
||||||
|
taosMemoryFree(tm);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
len = (int32_t)strlen(emptyStr);
|
len = (int32_t)strlen(emptyStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -384,7 +384,7 @@ static IterateValue* idxCacheIteratorGetValue(Iterate* iter);
|
||||||
IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
|
IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
|
||||||
IndexCache* cache = taosMemoryCalloc(1, sizeof(IndexCache));
|
IndexCache* cache = taosMemoryCalloc(1, sizeof(IndexCache));
|
||||||
if (cache == NULL) {
|
if (cache == NULL) {
|
||||||
indexError("failed to create index cache");
|
indexError("failed to create index cache since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
return NULL;
|
return NULL;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -392,6 +392,11 @@ IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8
|
||||||
cache->mem->pCache = cache;
|
cache->mem->pCache = cache;
|
||||||
cache->colName =
|
cache->colName =
|
||||||
IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? taosStrdup(JSON_COLUMN) : taosStrdup(colName);
|
IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? taosStrdup(JSON_COLUMN) : taosStrdup(colName);
|
||||||
|
if (cache->colName == NULL) {
|
||||||
|
taosMemoryFree(cache);
|
||||||
|
indexError("failed to create index cache since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
cache->type = type;
|
cache->type = type;
|
||||||
cache->index = idx;
|
cache->index = idx;
|
||||||
cache->version = 0;
|
cache->version = 0;
|
||||||
|
@ -831,6 +836,7 @@ static MemTable* idxInternalCacheCreate(int8_t type) {
|
||||||
IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? idxCacheJsonTermCompare : idxCacheTermCompare;
|
IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? idxCacheJsonTermCompare : idxCacheTermCompare;
|
||||||
|
|
||||||
MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable));
|
MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable));
|
||||||
|
if (tbl == NULL) return NULL;
|
||||||
idxMemRef(tbl);
|
idxMemRef(tbl);
|
||||||
// if (ttype == TSDB_DATA_TYPE_BINARY || ttype == TSDB_DATA_TYPE_NCHAR || ttype == TSDB_DATA_TYPE_GEOMETRY) {
|
// if (ttype == TSDB_DATA_TYPE_BINARY || ttype == TSDB_DATA_TYPE_NCHAR || ttype == TSDB_DATA_TYPE_GEOMETRY) {
|
||||||
tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, ttype, MAX_INDEX_KEY_LEN, cmpFn, SL_ALLOW_DUP_KEY, idxCacheTermGet);
|
tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, ttype, MAX_INDEX_KEY_LEN, cmpFn, SL_ALLOW_DUP_KEY, idxCacheTermGet);
|
||||||
|
|
|
@ -389,6 +389,9 @@ int32_t idxConvertDataToStr(void* src, int8_t type, void** dst) {
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_USMALLINT:
|
case TSDB_DATA_TYPE_USMALLINT:
|
||||||
*dst = taosMemoryCalloc(1, bufSize + 1);
|
*dst = taosMemoryCalloc(1, bufSize + 1);
|
||||||
|
if (*dst == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
TAOS_UNUSED(idxInt2str(*(uint16_t*)src, *dst, -1));
|
TAOS_UNUSED(idxInt2str(*(uint16_t*)src, *dst, -1));
|
||||||
tlen = strlen(*dst);
|
tlen = strlen(*dst);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -111,9 +111,6 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes* nodes, FstSlice bs, Output
|
||||||
|
|
||||||
if (un->last != NULL) return;
|
if (un->last != NULL) return;
|
||||||
|
|
||||||
// FstLastTransition *trn = taosMemoryMalloc(sizeof(FstLastTransition));
|
|
||||||
// trn->inp = s->data[s->start];
|
|
||||||
// trn->out = out;
|
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
uint8_t* data = fstSliceData(s, &len);
|
uint8_t* data = fstSliceData(s, &len);
|
||||||
un->last = fstLastTransitionCreate(data[0], out);
|
un->last = fstLastTransitionCreate(data[0], out);
|
||||||
|
@ -126,10 +123,11 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes* nodes, FstSlice bs, Output
|
||||||
n->isFinal = false;
|
n->isFinal = false;
|
||||||
n->finalOutput = 0;
|
n->finalOutput = 0;
|
||||||
n->trans = taosArrayInit(16, sizeof(FstTransition));
|
n->trans = taosArrayInit(16, sizeof(FstTransition));
|
||||||
|
if (n->trans == NULL) {
|
||||||
|
taosMemoryFree(n);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// FstLastTransition *trn = taosMemoryMalloc(sizeof(FstLastTransition));
|
|
||||||
// trn->inp = s->data[i];
|
|
||||||
// trn->out = out;
|
|
||||||
FstLastTransition* trn = fstLastTransitionCreate(data[i], 0);
|
FstLastTransition* trn = fstLastTransitionCreate(data[i], 0);
|
||||||
|
|
||||||
FstBuilderNodeUnfinished un = {.node = n, .last = trn};
|
FstBuilderNodeUnfinished un = {.node = n, .last = trn};
|
||||||
|
@ -1159,11 +1157,20 @@ FStmSt* stmStCreate(Fst* fst, FAutoCtx* automation, FstBoundWithData* min, FstBo
|
||||||
sws->fst = fst;
|
sws->fst = fst;
|
||||||
sws->aut = automation;
|
sws->aut = automation;
|
||||||
sws->inp = (SArray*)taosArrayInit(256, sizeof(uint8_t));
|
sws->inp = (SArray*)taosArrayInit(256, sizeof(uint8_t));
|
||||||
|
if (sws->inp == NULL) {
|
||||||
|
taosMemoryFree(sws);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
sws->emptyOutput.null = true;
|
sws->emptyOutput.null = true;
|
||||||
sws->emptyOutput.out = 0;
|
sws->emptyOutput.out = 0;
|
||||||
|
|
||||||
sws->stack = (SArray*)taosArrayInit(256, sizeof(FstStreamState));
|
sws->stack = (SArray*)taosArrayInit(256, sizeof(FstStreamState));
|
||||||
|
if (sws->stack == NULL) {
|
||||||
|
taosArrayDestroy(sws->inp);
|
||||||
|
taosMemoryFree(sws);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
sws->endAt = max;
|
sws->endAt = max;
|
||||||
TAOS_UNUSED(stmStSeekMin(sws, min));
|
TAOS_UNUSED(stmStSeekMin(sws, min));
|
||||||
|
|
||||||
|
|
|
@ -174,7 +174,18 @@ FAutoCtx* automCtxCreate(void* data, AutomationType atype) {
|
||||||
// add more search type
|
// add more search type
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx->data = (data != NULL ? taosStrdup((char*)data) : NULL);
|
// ctx->data = (data != NULL ? taosStrdup((char*)data) : NULL);
|
||||||
|
if (data != NULL) {
|
||||||
|
ctx->data = taosStrdup((char*)data);
|
||||||
|
if (ctx->data == NULL) {
|
||||||
|
startWithStateValueDestroy(sv);
|
||||||
|
taosMemoryFree(ctx);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ctx->data = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
ctx->type = atype;
|
ctx->type = atype;
|
||||||
ctx->stdata = (void*)sv;
|
ctx->stdata = (void*)sv;
|
||||||
return ctx;
|
return ctx;
|
||||||
|
|
|
@ -120,6 +120,8 @@ FstRegistryEntry* fstRegistryGetEntry(FstRegistry* registry, FstBuilderNode* bNo
|
||||||
uint64_t end = start + registry->mruSize;
|
uint64_t end = start + registry->mruSize;
|
||||||
|
|
||||||
FstRegistryEntry* entry = taosMemoryMalloc(sizeof(FstRegistryEntry));
|
FstRegistryEntry* entry = taosMemoryMalloc(sizeof(FstRegistryEntry));
|
||||||
|
if (entry == NULL) return NULL;
|
||||||
|
|
||||||
if (end - start == 1) {
|
if (end - start == 1) {
|
||||||
FstRegistryCell* cell = taosArrayGet(registry->table, start);
|
FstRegistryCell* cell = taosArrayGet(registry->table, start);
|
||||||
// cell->isNode &&
|
// cell->isNode &&
|
||||||
|
|
|
@ -28,6 +28,12 @@ FstSparseSet *sparSetCreate(int32_t sz) {
|
||||||
|
|
||||||
ss->dense = (int32_t *)taosMemoryMalloc(sz * sizeof(int32_t));
|
ss->dense = (int32_t *)taosMemoryMalloc(sz * sizeof(int32_t));
|
||||||
ss->sparse = (int32_t *)taosMemoryMalloc(sz * sizeof(int32_t));
|
ss->sparse = (int32_t *)taosMemoryMalloc(sz * sizeof(int32_t));
|
||||||
|
if (ss->dense == NULL || ss->sparse == NULL) {
|
||||||
|
taosMemoryFree(ss->dense);
|
||||||
|
taosMemoryFree(ss->sparse);
|
||||||
|
taosMemoryFree(ss);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
sparSetInitBuf(ss->dense, sz);
|
sparSetInitBuf(ss->dense, sz);
|
||||||
sparSetInitBuf(ss->sparse, sz);
|
sparSetInitBuf(ss->sparse, sz);
|
||||||
|
|
||||||
|
|
|
@ -77,10 +77,13 @@ CompiledAddr unpackDelta(char* data, uint64_t len, uint64_t nodeAddr) {
|
||||||
|
|
||||||
FstSlice fstSliceCreate(uint8_t* data, uint64_t len) {
|
FstSlice fstSliceCreate(uint8_t* data, uint64_t len) {
|
||||||
FstString* str = (FstString*)taosMemoryMalloc(sizeof(FstString));
|
FstString* str = (FstString*)taosMemoryMalloc(sizeof(FstString));
|
||||||
|
if (str == NULL) {
|
||||||
|
return (FstSlice){.str = NULL, .start = 0, .end = 0};
|
||||||
|
}
|
||||||
str->ref = 1;
|
str->ref = 1;
|
||||||
str->len = len;
|
str->len = len;
|
||||||
str->data = taosMemoryMalloc(len * sizeof(uint8_t));
|
str->data = taosMemoryMalloc(len * sizeof(uint8_t));
|
||||||
if (str == NULL || str->data == NULL) {
|
if (str->data == NULL) {
|
||||||
taosMemoryFree(str);
|
taosMemoryFree(str);
|
||||||
return (FstSlice){.str = NULL, .start = 0, .end = 0};
|
return (FstSlice){.str = NULL, .start = 0, .end = 0};
|
||||||
}
|
}
|
||||||
|
@ -107,9 +110,16 @@ FstSlice fstSliceDeepCopy(FstSlice* s, int32_t start, int32_t end) {
|
||||||
uint8_t* data = fstSliceData(s, &slen);
|
uint8_t* data = fstSliceData(s, &slen);
|
||||||
|
|
||||||
uint8_t* buf = taosMemoryMalloc(sizeof(uint8_t) * tlen);
|
uint8_t* buf = taosMemoryMalloc(sizeof(uint8_t) * tlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
return (FstSlice){.str = NULL, .start = 0, .end = 0};
|
||||||
|
}
|
||||||
memcpy(buf, data + start, tlen);
|
memcpy(buf, data + start, tlen);
|
||||||
|
|
||||||
FstString* str = taosMemoryMalloc(sizeof(FstString));
|
FstString* str = taosMemoryMalloc(sizeof(FstString));
|
||||||
|
if (str == NULL) {
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return (FstSlice){.str = NULL, .start = 0, .end = 0};
|
||||||
|
}
|
||||||
str->data = buf;
|
str->data = buf;
|
||||||
str->len = tlen;
|
str->len = tlen;
|
||||||
str->ref = 1;
|
str->ref = 1;
|
||||||
|
|
|
@ -1178,106 +1178,6 @@ _exception:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef BUILD_NO_CALL
|
|
||||||
static int32_t chkpIdComp(const void* a, const void* b) {
|
|
||||||
int64_t x = *(int64_t*)a;
|
|
||||||
int64_t y = *(int64_t*)b;
|
|
||||||
return x < y ? -1 : 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamBackendLoadCheckpointInfo(void* arg) {
|
|
||||||
SStreamMeta* pMeta = arg;
|
|
||||||
int32_t code = 0;
|
|
||||||
SArray* suffix = NULL;
|
|
||||||
|
|
||||||
int32_t len = strlen(pMeta->path) + 30;
|
|
||||||
char* chkpPath = taosMemoryCalloc(1, len);
|
|
||||||
sprintf(chkpPath, "%s%s%s", pMeta->path, TD_DIRSEP, "checkpoints");
|
|
||||||
|
|
||||||
if (!taosDirExist(chkpPath)) {
|
|
||||||
// no checkpoint, nothing to load
|
|
||||||
taosMemoryFree(chkpPath);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
TdDirPtr pDir = taosOpenDir(chkpPath);
|
|
||||||
if (pDir == NULL) {
|
|
||||||
taosMemoryFree(chkpPath);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
TdDirEntryPtr de = NULL;
|
|
||||||
suffix = taosArrayInit(4, sizeof(int64_t));
|
|
||||||
|
|
||||||
while ((de = taosReadDir(pDir)) != NULL) {
|
|
||||||
if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
|
|
||||||
|
|
||||||
if (taosDirEntryIsDir(de)) {
|
|
||||||
char checkpointPrefix[32] = {0};
|
|
||||||
int64_t checkpointId = 0;
|
|
||||||
|
|
||||||
int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64 "", &checkpointId);
|
|
||||||
if (ret == 1) {
|
|
||||||
taosArrayPush(suffix, &checkpointId);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
taosArraySort(suffix, chkpIdComp);
|
|
||||||
// free previous chkpSaved
|
|
||||||
taosArrayClear(pMeta->chkpSaved);
|
|
||||||
for (int i = 0; i < taosArrayGetSize(suffix); i++) {
|
|
||||||
int64_t id = *(int64_t*)taosArrayGet(suffix, i);
|
|
||||||
taosArrayPush(pMeta->chkpSaved, &id);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(suffix);
|
|
||||||
taosCloseDir(&pDir);
|
|
||||||
taosMemoryFree(chkpPath);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef BUILD_NO_CALL
|
|
||||||
int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*** ppHandle, SArray* refs) {
|
|
||||||
return 0;
|
|
||||||
// SArray* pHandle = taosArrayInit(16, POINTER_BYTES);
|
|
||||||
// void* pIter = taosHashIterate(pMeta->pTaskDbUnique, NULL);
|
|
||||||
// while (pIter) {
|
|
||||||
// int64_t id = *(int64_t*)pIter;
|
|
||||||
|
|
||||||
// SBackendCfWrapper* wrapper = taosAcquireRef(streamBackendCfWrapperId, id);
|
|
||||||
// if (wrapper == NULL) {
|
|
||||||
// pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// taosThreadRwlockRdlock(&wrapper->rwLock);
|
|
||||||
// for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
|
|
||||||
// if (wrapper->pHandle[i]) {
|
|
||||||
// rocksdb_column_family_handle_t* p = wrapper->pHandle[i];
|
|
||||||
// taosArrayPush(pHandle, &p);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// taosThreadRwlockUnlock(&wrapper->rwLock);
|
|
||||||
|
|
||||||
// taosArrayPush(refs, &id);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// int32_t nCf = taosArrayGetSize(pHandle);
|
|
||||||
|
|
||||||
// rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
|
|
||||||
// for (int i = 0; i < nCf; i++) {
|
|
||||||
// ppCf[i] = taosArrayGetP(pHandle, i);
|
|
||||||
// }
|
|
||||||
// taosArrayDestroy(pHandle);
|
|
||||||
|
|
||||||
// *ppHandle = ppCf;
|
|
||||||
// return nCf;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int chkpIdComp(const void* a, const void* b) {
|
int chkpIdComp(const void* a, const void* b) {
|
||||||
int64_t x = *(int64_t*)a;
|
int64_t x = *(int64_t*)a;
|
||||||
int64_t y = *(int64_t*)b;
|
int64_t y = *(int64_t*)b;
|
||||||
|
@ -1288,6 +1188,9 @@ int chkpIdComp(const void* a, const void* b) {
|
||||||
int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) {
|
int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
char* pChkpDir = taosMemoryCalloc(1, 256);
|
char* pChkpDir = taosMemoryCalloc(1, 256);
|
||||||
|
if (pChkpDir == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
sprintf(pChkpDir, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints");
|
sprintf(pChkpDir, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints");
|
||||||
if (!taosIsDir(pChkpDir)) {
|
if (!taosIsDir(pChkpDir)) {
|
||||||
|
@ -2376,8 +2279,14 @@ int32_t taskDbOpenCfs(STaskDbWrapper* pTask, char* path, char** pCfNames, int32_
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
|
|
||||||
rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
|
rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
|
||||||
|
if (cfOpts == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
|
rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
|
||||||
|
if (cfHandle == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < nCf; i++) {
|
for (int i = 0; i < nCf; i++) {
|
||||||
int32_t idx = getCfIdx(pCfNames[i]);
|
int32_t idx = getCfIdx(pCfNames[i]);
|
||||||
|
@ -2461,6 +2370,14 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
|
||||||
pTaskDb->pCfParams = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
|
pTaskDb->pCfParams = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
|
||||||
pTaskDb->pCfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
|
pTaskDb->pCfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
|
||||||
pTaskDb->pCompares = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*));
|
pTaskDb->pCompares = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*));
|
||||||
|
if (pTaskDb->pCf == NULL || pTaskDb->pCfParams == NULL || pTaskDb->pCfOpts == NULL || pTaskDb->pCompares == NULL) {
|
||||||
|
stError("failed to alloc memory for cf");
|
||||||
|
taosMemoryFreeClear(pTaskDb->pCf);
|
||||||
|
taosMemoryFreeClear(pTaskDb->pCfParams);
|
||||||
|
taosMemoryFreeClear(pTaskDb->pCfOpts);
|
||||||
|
taosMemoryFreeClear(pTaskDb->pCompares);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < nCf; i++) {
|
for (int i = 0; i < nCf; i++) {
|
||||||
rocksdb_options_t* opt = rocksdb_options_create_copy(pTaskDb->dbOpt);
|
rocksdb_options_t* opt = rocksdb_options_create_copy(pTaskDb->dbOpt);
|
||||||
|
|
|
@ -266,16 +266,23 @@ static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port,
|
||||||
msg->server = taosStrdup(server);
|
msg->server = taosStrdup(server);
|
||||||
msg->uri = taosStrdup(uri);
|
msg->uri = taosStrdup(uri);
|
||||||
msg->cont = taosMemoryMalloc(contLen);
|
msg->cont = taosMemoryMalloc(contLen);
|
||||||
if (qid != NULL)
|
|
||||||
msg->qid = taosStrdup(qid);
|
|
||||||
else
|
|
||||||
msg->qid = NULL;
|
|
||||||
if (msg->server == NULL || msg->uri == NULL || msg->cont == NULL) {
|
if (msg->server == NULL || msg->uri == NULL || msg->cont == NULL) {
|
||||||
httpDestroyMsg(msg);
|
httpDestroyMsg(msg);
|
||||||
*httpMsg = NULL;
|
*httpMsg = NULL;
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (qid != NULL) {
|
||||||
|
msg->qid = taosStrdup(qid);
|
||||||
|
if (msg->qid == NULL) {
|
||||||
|
httpDestroyMsg(msg);
|
||||||
|
*httpMsg = NULL;
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
msg->qid = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
memcpy(msg->cont, pCont, contLen);
|
memcpy(msg->cont, pCont, contLen);
|
||||||
msg->len = contLen;
|
msg->len = contLen;
|
||||||
msg->flag = flag;
|
msg->flag = flag;
|
||||||
|
|
|
@ -673,6 +673,10 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
|
||||||
plist = taosHashGet(pool, key, klen);
|
plist = taosHashGet(pool, key, klen);
|
||||||
|
|
||||||
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
||||||
|
if (nList == NULL) {
|
||||||
|
tError("failed to alloc memory for msg list, reason:%s", tstrerror(terrno));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
QUEUE_INIT(&nList->msgQ);
|
QUEUE_INIT(&nList->msgQ);
|
||||||
nList->numOfConn++;
|
nList->numOfConn++;
|
||||||
|
|
||||||
|
@ -720,6 +724,11 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
|
||||||
plist = taosHashGet(pool, key, klen);
|
plist = taosHashGet(pool, key, klen);
|
||||||
|
|
||||||
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
||||||
|
if (nList == NULL) {
|
||||||
|
tError("failed to alloc memory for msg list, reason:%s", tstrerror(terrno));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
QUEUE_INIT(&nList->msgQ);
|
QUEUE_INIT(&nList->msgQ);
|
||||||
nList->numOfConn++;
|
nList->numOfConn++;
|
||||||
|
|
||||||
|
@ -1303,11 +1312,16 @@ void cliSend(SCliConn* pConn) {
|
||||||
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
|
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
|
||||||
if (timer == NULL) {
|
if (timer == NULL) {
|
||||||
timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
|
timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
|
||||||
|
if (timer == NULL) {
|
||||||
|
tError("failed to alloc timer since %s", tstrerror(terrno));
|
||||||
|
}
|
||||||
tDebug("no available timer, create a timer %p", timer);
|
tDebug("no available timer, create a timer %p", timer);
|
||||||
TAOS_UNUSED(uv_timer_init(pThrd->loop, timer));
|
TAOS_UNUSED(uv_timer_init(pThrd->loop, timer));
|
||||||
}
|
}
|
||||||
timer->data = pConn;
|
if (timer != NULL) {
|
||||||
pConn->timer = timer;
|
timer->data = pConn;
|
||||||
|
pConn->timer = timer;
|
||||||
|
}
|
||||||
|
|
||||||
tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType));
|
tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType));
|
||||||
TAOS_UNUSED(uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0));
|
TAOS_UNUSED(uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0));
|
||||||
|
@ -1327,6 +1341,11 @@ void cliSend(SCliConn* pConn) {
|
||||||
|
|
||||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||||
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
|
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
|
||||||
|
if (req == NULL) {
|
||||||
|
tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType),
|
||||||
|
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
|
cliHandleExcept(pConn, -1);
|
||||||
|
}
|
||||||
|
|
||||||
int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
|
int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
|
@ -1871,6 +1890,10 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
TAOS_UNUSED(transQueuePush(&conn->cliMsgs, pMsg));
|
TAOS_UNUSED(transQueuePush(&conn->cliMsgs, pMsg));
|
||||||
|
|
||||||
conn->dstAddr = taosStrdup(addr);
|
conn->dstAddr = taosStrdup(addr);
|
||||||
|
if (conn->dstAddr == NULL) {
|
||||||
|
tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, tstrerror(terrno));
|
||||||
|
cliHandleExcept(conn, -1);
|
||||||
|
}
|
||||||
|
|
||||||
uint32_t ipaddr;
|
uint32_t ipaddr;
|
||||||
int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr);
|
int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr);
|
||||||
|
@ -2546,6 +2569,10 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
cliSchedMsgToDebug(pMsg, transLabel(pThrd->pTransInst));
|
cliSchedMsgToDebug(pMsg, transLabel(pThrd->pTransInst));
|
||||||
|
|
||||||
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
||||||
|
if (arg == NULL) {
|
||||||
|
tError("failed to malloc memory, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
|
return;
|
||||||
|
}
|
||||||
arg->param1 = pMsg;
|
arg->param1 = pMsg;
|
||||||
arg->param2 = pThrd;
|
arg->param2 = pThrd;
|
||||||
|
|
||||||
|
|
|
@ -421,6 +421,9 @@ void transReqQueueInit(queue* q) {
|
||||||
}
|
}
|
||||||
void* transReqQueuePush(queue* q) {
|
void* transReqQueuePush(queue* q) {
|
||||||
STransReq* req = taosMemoryCalloc(1, sizeof(STransReq));
|
STransReq* req = taosMemoryCalloc(1, sizeof(STransReq));
|
||||||
|
if (req == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
req->wreq.data = req;
|
req->wreq.data = req;
|
||||||
QUEUE_PUSH(q, &req->q);
|
QUEUE_PUSH(q, &req->q);
|
||||||
return &req->wreq;
|
return &req->wreq;
|
||||||
|
|
|
@ -707,6 +707,10 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
|
||||||
|
|
||||||
transRefSrvHandle(pConn);
|
transRefSrvHandle(pConn);
|
||||||
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
|
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
|
||||||
|
if (req == NULL) {
|
||||||
|
tError("failed to send resp since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
|
return;
|
||||||
|
}
|
||||||
TAOS_UNUSED(uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb));
|
TAOS_UNUSED(uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb));
|
||||||
}
|
}
|
||||||
static void uvStartSendResp(SSvrMsg* smsg) {
|
static void uvStartSendResp(SSvrMsg* smsg) {
|
||||||
|
@ -842,6 +846,10 @@ static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
|
||||||
|
|
||||||
STransMsg tmsg = {.code = 0, .info.handle = (void*)pConn, .info.traceId = traceId, .info.ahandle = (void*)0x9527};
|
STransMsg tmsg = {.code = 0, .info.handle = (void*)pConn, .info.traceId = traceId, .info.ahandle = (void*)0x9527};
|
||||||
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||||
|
if (srvMsg == NULL) {
|
||||||
|
tError("failed to alloc buf to send release resp since %s", tstrerror(terrno));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
srvMsg->msg = tmsg;
|
srvMsg->msg = tmsg;
|
||||||
srvMsg->type = Release;
|
srvMsg->type = Release;
|
||||||
srvMsg->pConn = pConn;
|
srvMsg->pConn = pConn;
|
||||||
|
@ -905,6 +913,11 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
|
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
|
||||||
|
if (wr == NULL) {
|
||||||
|
tError("failed to accept since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
wr->data = cli;
|
wr->data = cli;
|
||||||
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
|
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
|
||||||
|
|
||||||
|
@ -1401,6 +1414,10 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
|
|
||||||
for (int i = 0; i < srv->numOfThreads; i++) {
|
for (int i = 0; i < srv->numOfThreads; i++) {
|
||||||
SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd));
|
SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd));
|
||||||
|
if (thrd == NULL) {
|
||||||
|
code = terrno;
|
||||||
|
goto End;
|
||||||
|
}
|
||||||
thrd->pTransInst = shandle;
|
thrd->pTransInst = shandle;
|
||||||
thrd->quit = false;
|
thrd->quit = false;
|
||||||
thrd->pTransInst = shandle;
|
thrd->pTransInst = shandle;
|
||||||
|
@ -1654,6 +1671,10 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
|
||||||
}
|
}
|
||||||
void sendQuitToWorkThrd(SWorkThrd* pThrd) {
|
void sendQuitToWorkThrd(SWorkThrd* pThrd) {
|
||||||
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||||
|
if (msg == NULL) {
|
||||||
|
tError("failed to send quit msg to work thread since %s", tstrerror(terrno));
|
||||||
|
return;
|
||||||
|
}
|
||||||
msg->type = Quit;
|
msg->type = Quit;
|
||||||
tDebug("server send quit msg to work thread");
|
tDebug("server send quit msg to work thread");
|
||||||
TAOS_UNUSED(transAsyncSend(pThrd->asyncPool, &msg->q));
|
TAOS_UNUSED(transAsyncSend(pThrd->asyncPool, &msg->q));
|
||||||
|
|
Loading…
Reference in New Issue