Merge pull request #28021 from taosdata/enh/TD-31691
enh:modify error code passing
This commit is contained in:
commit
288e9adebb
|
@ -4345,7 +4345,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
|
||||||
int64_t checkPoint = 0;
|
int64_t checkPoint = 0;
|
||||||
if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
|
if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
|
||||||
if (taosArrayPush(result, &checkPoint) == NULL) {
|
if (taosArrayPush(result, &checkPoint) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4565,7 +4565,7 @@ int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
|
||||||
(void)strncpy(fname, name, len);
|
(void)strncpy(fname, name, len);
|
||||||
if (taosArrayPush(diff, &fname) == NULL) {
|
if (taosArrayPush(diff, &fname) == NULL) {
|
||||||
taosMemoryFree(fname);
|
taosMemoryFree(fname);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pIter = taosHashIterate(p2, pIter);
|
pIter = taosHashIterate(p2, pIter);
|
||||||
|
@ -4684,7 +4684,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
|
||||||
|
|
||||||
p->pCurrent = taosStrdup(name);
|
p->pCurrent = taosStrdup(name);
|
||||||
if (p->pCurrent == NULL) {
|
if (p->pCurrent == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
|
@ -4694,7 +4694,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
|
||||||
taosMemoryFreeClear(p->pManifest);
|
taosMemoryFreeClear(p->pManifest);
|
||||||
p->pManifest = taosStrdup(name);
|
p->pManifest = taosStrdup(name);
|
||||||
if (p->pManifest == NULL) {
|
if (p->pManifest == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
|
@ -4728,7 +4728,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
|
||||||
if (taosArrayPush(p->pAdd, &fname) == NULL) {
|
if (taosArrayPush(p->pAdd, &fname) == NULL) {
|
||||||
taosMemoryFree(fname);
|
taosMemoryFree(fname);
|
||||||
(void)taosThreadRwlockUnlock(&p->rwLock);
|
(void)taosThreadRwlockUnlock(&p->rwLock);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter);
|
pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter);
|
||||||
|
@ -4780,7 +4780,7 @@ int32_t dbChkpCreate(char* path, int64_t initChkpId, SDbChkp** ppChkp) {
|
||||||
p->preCkptId = -1;
|
p->preCkptId = -1;
|
||||||
p->pSST = taosArrayInit(64, sizeof(void*));
|
p->pSST = taosArrayInit(64, sizeof(void*));
|
||||||
if (p->pSST == NULL) {
|
if (p->pSST == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
dbChkpDestroy(p);
|
dbChkpDestroy(p);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -4796,25 +4796,25 @@ int32_t dbChkpCreate(char* path, int64_t initChkpId, SDbChkp** ppChkp) {
|
||||||
p->idx = 0;
|
p->idx = 0;
|
||||||
p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
if (p->pSstTbl[0] == NULL) {
|
if (p->pSstTbl[0] == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
goto _EXIT;
|
goto _EXIT;
|
||||||
}
|
}
|
||||||
|
|
||||||
p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
if (p->pSstTbl[1] == NULL) {
|
if (p->pSstTbl[1] == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
goto _EXIT;
|
goto _EXIT;
|
||||||
}
|
}
|
||||||
|
|
||||||
p->pAdd = taosArrayInit(64, sizeof(void*));
|
p->pAdd = taosArrayInit(64, sizeof(void*));
|
||||||
if (p->pAdd == NULL) {
|
if (p->pAdd == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
goto _EXIT;
|
goto _EXIT;
|
||||||
}
|
}
|
||||||
|
|
||||||
p->pDel = taosArrayInit(64, sizeof(void*));
|
p->pDel = taosArrayInit(64, sizeof(void*));
|
||||||
if (p->pDel == NULL) {
|
if (p->pDel == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
goto _EXIT;
|
goto _EXIT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -101,7 +101,7 @@ static int32_t addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInf
|
||||||
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
||||||
void* tmp = taosArrayPush(pWinInfos, &pNewPos);
|
void* tmp = taosArrayPush(pWinInfos, &pNewPos);
|
||||||
if (!tmp) {
|
if (!tmp) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
(*ppPos) = pNewPos;
|
(*ppPos) = pNewPos;
|
||||||
|
@ -126,7 +126,7 @@ static int32_t insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWin
|
||||||
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
||||||
void* tmp = taosArrayInsert(pWinInfos, index, &pNewPos);
|
void* tmp = taosArrayInsert(pWinInfos, index, &pNewPos);
|
||||||
if (!tmp) {
|
if (!tmp) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -180,7 +180,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
|
||||||
} else {
|
} else {
|
||||||
pWinStates = taosArrayInit(16, POINTER_BYTES);
|
pWinStates = taosArrayInit(16, POINTER_BYTES);
|
||||||
if (!pWinStates) {
|
if (!pWinStates) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
|
code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
|
||||||
|
@ -301,7 +301,7 @@ int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos)
|
||||||
} else {
|
} else {
|
||||||
pWinStates = taosArrayInit(16, POINTER_BYTES);
|
pWinStates = taosArrayInit(16, POINTER_BYTES);
|
||||||
if (!pWinStates) {
|
if (!pWinStates) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -313,7 +313,7 @@ int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos)
|
||||||
if (size == 0) {
|
if (size == 0) {
|
||||||
void* tmp = taosArrayPush(pWinStates, &pPos);
|
void* tmp = taosArrayPush(pWinStates, &pPos);
|
||||||
if (!tmp) {
|
if (!tmp) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
goto _end;
|
goto _end;
|
||||||
|
@ -324,13 +324,13 @@ int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos)
|
||||||
if (index >= 0) {
|
if (index >= 0) {
|
||||||
void* tmp = taosArrayInsert(pWinStates, index, &pPos);
|
void* tmp = taosArrayInsert(pWinStates, index, &pPos);
|
||||||
if (!tmp) {
|
if (!tmp) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
void* tmp = taosArrayInsert(pWinStates, 0, &pPos);
|
void* tmp = taosArrayInsert(pWinStates, 0, &pPos);
|
||||||
if (!tmp) {
|
if (!tmp) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -421,7 +421,7 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream
|
||||||
if (!ppBuff) {
|
if (!ppBuff) {
|
||||||
pWinStates = taosArrayInit(16, POINTER_BYTES);
|
pWinStates = taosArrayInit(16, POINTER_BYTES);
|
||||||
if (!pWinStates) {
|
if (!pWinStates) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -804,7 +804,7 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
|
||||||
} else {
|
} else {
|
||||||
pWinStates = taosArrayInit(16, POINTER_BYTES);
|
pWinStates = taosArrayInit(16, POINTER_BYTES);
|
||||||
if (!pWinStates) {
|
if (!pWinStates) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -944,7 +944,7 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
|
||||||
} else {
|
} else {
|
||||||
pWinStates = taosArrayInit(16, POINTER_BYTES);
|
pWinStates = taosArrayInit(16, POINTER_BYTES);
|
||||||
if (!pWinStates) {
|
if (!pWinStates) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -204,7 +204,7 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) {
|
||||||
|
|
||||||
p = taosArrayPush(pSnapFile->pFileList, &item);
|
p = taosArrayPush(pSnapFile->pFileList, &item);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
// mainfest
|
// mainfest
|
||||||
|
@ -217,7 +217,7 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) {
|
||||||
|
|
||||||
p = taosArrayPush(pSnapFile->pFileList, &item);
|
p = taosArrayPush(pSnapFile->pFileList, &item);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
// options
|
// options
|
||||||
|
@ -230,7 +230,7 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) {
|
||||||
|
|
||||||
p = taosArrayPush(pSnapFile->pFileList, &item);
|
p = taosArrayPush(pSnapFile->pFileList, &item);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
// sst
|
// sst
|
||||||
|
@ -245,7 +245,7 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) {
|
||||||
|
|
||||||
p = taosArrayPush(pSnapFile->pFileList, &item);
|
p = taosArrayPush(pSnapFile->pFileList, &item);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -255,7 +255,7 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) {
|
||||||
if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) {
|
if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) {
|
||||||
p = taosArrayPush(pSnapFile->pFileList, &item);
|
p = taosArrayPush(pSnapFile->pFileList, &item);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,7 +265,7 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) {
|
||||||
if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) {
|
if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) {
|
||||||
p = taosArrayPush(pSnapFile->pFileList, &item);
|
p = taosArrayPush(pSnapFile->pFileList, &item);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -287,7 +287,7 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
|
||||||
if (strlen(name) >= strlen(ROCKSDB_CURRENT) && 0 == strncmp(name, ROCKSDB_CURRENT, strlen(ROCKSDB_CURRENT))) {
|
if (strlen(name) >= strlen(ROCKSDB_CURRENT) && 0 == strncmp(name, ROCKSDB_CURRENT, strlen(ROCKSDB_CURRENT))) {
|
||||||
pSnapFile->pCurrent = taosStrdup(name);
|
pSnapFile->pCurrent = taosStrdup(name);
|
||||||
if (pSnapFile->pCurrent == NULL) {
|
if (pSnapFile->pCurrent == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
|
@ -295,7 +295,7 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
|
||||||
if (strlen(name) >= strlen(ROCKSDB_MAINFEST) && 0 == strncmp(name, ROCKSDB_MAINFEST, strlen(ROCKSDB_MAINFEST))) {
|
if (strlen(name) >= strlen(ROCKSDB_MAINFEST) && 0 == strncmp(name, ROCKSDB_MAINFEST, strlen(ROCKSDB_MAINFEST))) {
|
||||||
pSnapFile->pMainfest = taosStrdup(name);
|
pSnapFile->pMainfest = taosStrdup(name);
|
||||||
if (pSnapFile->pMainfest == NULL) {
|
if (pSnapFile->pMainfest == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
|
@ -303,7 +303,7 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
|
||||||
if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) {
|
if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) {
|
||||||
pSnapFile->pOptions = taosStrdup(name);
|
pSnapFile->pOptions = taosStrdup(name);
|
||||||
if (pSnapFile->pOptions == NULL) {
|
if (pSnapFile->pOptions == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
|
@ -312,7 +312,7 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
|
||||||
0 == strncmp(name, ROCKSDB_CHECKPOINT_META, strlen(ROCKSDB_CHECKPOINT_META))) {
|
0 == strncmp(name, ROCKSDB_CHECKPOINT_META, strlen(ROCKSDB_CHECKPOINT_META))) {
|
||||||
pSnapFile->pCheckpointMeta = taosStrdup(name);
|
pSnapFile->pCheckpointMeta = taosStrdup(name);
|
||||||
if (pSnapFile->pCheckpointMeta == NULL) {
|
if (pSnapFile->pCheckpointMeta == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
|
@ -321,7 +321,7 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
|
||||||
0 == strncmp(name, ROCKSDB_CHECKPOINT_SELF_CHECK, strlen(ROCKSDB_CHECKPOINT_SELF_CHECK))) {
|
0 == strncmp(name, ROCKSDB_CHECKPOINT_SELF_CHECK, strlen(ROCKSDB_CHECKPOINT_SELF_CHECK))) {
|
||||||
pSnapFile->pCheckpointSelfcheck = taosStrdup(name);
|
pSnapFile->pCheckpointSelfcheck = taosStrdup(name);
|
||||||
if (pSnapFile->pCheckpointSelfcheck == NULL) {
|
if (pSnapFile->pCheckpointSelfcheck == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
|
@ -330,13 +330,13 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
|
||||||
0 == strncmp(name + strlen(name) - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) {
|
0 == strncmp(name + strlen(name) - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) {
|
||||||
char* sst = taosStrdup(name);
|
char* sst = taosStrdup(name);
|
||||||
if (sst == NULL) {
|
if (sst == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* p = taosArrayPush(pSnapFile->pSst, &sst);
|
void* p = taosArrayPush(pSnapFile->pSst, &sst);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -370,7 +370,7 @@ int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBacke
|
||||||
pSnapFile->pSst = taosArrayInit(16, sizeof(void*));
|
pSnapFile->pSst = taosArrayInit(16, sizeof(void*));
|
||||||
pSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem));
|
pSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem));
|
||||||
if (pSnapFile->pSst == NULL || pSnapFile->pFileList == NULL) {
|
if (pSnapFile->pSst == NULL || pSnapFile->pFileList == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
goto _ERROR;
|
goto _ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -426,7 +426,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
|
||||||
|
|
||||||
SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap));
|
SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap));
|
||||||
if (pSnapInfoSet == NULL) {
|
if (pSnapInfoSet == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet);
|
code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet);
|
||||||
|
@ -437,7 +437,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
|
||||||
|
|
||||||
pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
|
pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
|
||||||
if (pDbSnapSet == NULL) {
|
if (pDbSnapSet == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -452,7 +452,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
|
||||||
|
|
||||||
void* p = taosArrayPush(pDbSnapSet, &snapFile);
|
void* p = taosArrayPush(pDbSnapSet, &snapFile);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -651,7 +651,7 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path
|
||||||
pHandle->metaPath = taosStrdup(path);
|
pHandle->metaPath = taosStrdup(path);
|
||||||
if (pHandle->metaPath == NULL) {
|
if (pHandle->metaPath == NULL) {
|
||||||
taosMemoryFree(pWriter);
|
taosMemoryFree(pWriter);
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -673,7 +673,7 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path
|
||||||
stError("failed close snaphost writer");
|
stError("failed close snaphost writer");
|
||||||
}
|
}
|
||||||
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -726,12 +726,12 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
|
||||||
item.name = taosStrdup(pHdr->name);
|
item.name = taosStrdup(pHdr->name);
|
||||||
item.type = pHdr->type;
|
item.type = pHdr->type;
|
||||||
if (item.name == NULL) {
|
if (item.name == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* p = taosArrayPush(pSnapFile->pFileList, &item);
|
void* p = taosArrayPush(pSnapFile->pFileList, &item);
|
||||||
if (p == NULL) { // can NOT goto _err here.
|
if (p == NULL) { // can NOT goto _err here.
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
SBackendFileItem* pItem2 = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
|
SBackendFileItem* pItem2 = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
|
||||||
|
@ -810,7 +810,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
|
||||||
|
|
||||||
void* p = taosArrayPush(pDbSnapFile->pFileList, &item);
|
void* p = taosArrayPush(pDbSnapFile->pFileList, &item);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
pDbSnapFile->inited = 1;
|
pDbSnapFile->inited = 1;
|
||||||
|
@ -822,7 +822,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
|
||||||
SBackendSnapFile2 snapFile = {0};
|
SBackendSnapFile2 snapFile = {0};
|
||||||
void* p = taosArrayPush(pHandle->pDbSnapSet, &snapFile);
|
void* p = taosArrayPush(pHandle->pDbSnapSet, &snapFile);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
pHandle->currIdx += 1;
|
pHandle->currIdx += 1;
|
||||||
|
|
|
@ -70,7 +70,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
|
||||||
int32_t len = sizeof(SStreamScanHistoryReq);
|
int32_t len = sizeof(SStreamScanHistoryReq);
|
||||||
void* serializedReq = rpcMallocCont(len);
|
void* serializedReq = rpcMallocCont(len);
|
||||||
if (serializedReq == NULL) {
|
if (serializedReq == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(serializedReq, &req, len);
|
memcpy(serializedReq, &req, len);
|
||||||
|
|
|
@ -106,14 +106,14 @@ SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, i
|
||||||
stDebug("open stream state %p, %s", pState, path);
|
stDebug("open stream state %p, %s", pState, path);
|
||||||
|
|
||||||
if (pState == NULL) {
|
if (pState == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
|
pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
|
||||||
if (pState->pTdbState == NULL) {
|
if (pState->pTdbState == NULL) {
|
||||||
streamStateDestroy(pState, true);
|
streamStateDestroy(pState, true);
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,7 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
|
||||||
int32_t childId = taosArrayGetSize(pArray);
|
int32_t childId = taosArrayGetSize(pArray);
|
||||||
pTask->info.selfChildId = childId;
|
pTask->info.selfChildId = childId;
|
||||||
void* p = taosArrayPush(pArray, &pTask);
|
void* p = taosArrayPush(pArray, &pTask);
|
||||||
return (p == NULL) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS;
|
return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
|
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
|
||||||
|
@ -409,7 +409,7 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
|
||||||
int32_t len = strlen(pTask->pMeta->path);
|
int32_t len = strlen(pTask->pMeta->path);
|
||||||
pTask->backendPath = (char*)taosMemoryMalloc(len + nBytes + 2);
|
pTask->backendPath = (char*)taosMemoryMalloc(len + nBytes + 2);
|
||||||
if (pTask->backendPath == NULL) {
|
if (pTask->backendPath == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)sprintf(pTask->backendPath, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
|
(void)sprintf(pTask->backendPath, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
|
||||||
|
@ -555,7 +555,7 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre
|
||||||
}
|
}
|
||||||
|
|
||||||
void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo);
|
void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo);
|
||||||
return (p == NULL) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS;
|
return (p == NULL) ? terrno : TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
|
int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
|
||||||
|
@ -1131,7 +1131,7 @@ int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId
|
||||||
*pId = taosStrdup(buf);
|
*pId = taosStrdup(buf);
|
||||||
|
|
||||||
if (*pId == NULL) {
|
if (*pId == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
} else {
|
} else {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,7 +88,7 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv
|
||||||
if (px == NULL) {
|
if (px == NULL) {
|
||||||
stError("s-task:%s failed to add into waiting list, total waiting events:%d, code: out of memory", pTask->id.idStr,
|
stError("s-task:%s failed to add into waiting list, total waiting events:%d, code: out of memory", pTask->id.idStr,
|
||||||
(int32_t)taosArrayGetSize(pList));
|
(int32_t)taosArrayGetSize(pList));
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s add into waiting list, total waiting events:%d", pTask->id.idStr,
|
stDebug("s-task:%s add into waiting list, total waiting events:%d", pTask->id.idStr,
|
||||||
(int32_t)taosArrayGetSize(pList));
|
(int32_t)taosArrayGetSize(pList));
|
||||||
|
@ -298,7 +298,7 @@ int32_t streamCreateStateMachine(SStreamTask* pTask) {
|
||||||
taosMemoryFree(pSM);
|
taosMemoryFree(pSM);
|
||||||
stError("s-task:%s failed to create task stateMachine, size:%d, code:%s", id, (int32_t)sizeof(SStreamTaskSM),
|
stError("s-task:%s failed to create task stateMachine, size:%d, code:%s", id, (int32_t)sizeof(SStreamTaskSM),
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the initial state for the state-machine of stream task
|
// set the initial state for the state-machine of stream task
|
||||||
|
|
|
@ -80,7 +80,7 @@ int32_t windowSBfAdd(SUpdateInfo* pInfo, uint64_t count) {
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
void* res = taosArrayPush(pInfo->pTsSBFs, &tsSBF);
|
void* res = taosArrayPush(pInfo->pTsSBFs, &tsSBF);
|
||||||
if (!res) {
|
if (!res) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -168,7 +168,7 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b
|
||||||
pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(void*));
|
pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(void*));
|
||||||
if (pInfo->pTsSBFs == NULL) {
|
if (pInfo->pTsSBFs == NULL) {
|
||||||
updateInfoDestroy(pInfo);
|
updateInfoDestroy(pInfo);
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
code = windowSBfAdd(pInfo, bfSize);
|
code = windowSBfAdd(pInfo, bfSize);
|
||||||
|
@ -177,7 +177,7 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b
|
||||||
pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY));
|
pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY));
|
||||||
if (pInfo->pTsBuckets == NULL) {
|
if (pInfo->pTsBuckets == NULL) {
|
||||||
updateInfoDestroy(pInfo);
|
updateInfoDestroy(pInfo);
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,7 +185,7 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b
|
||||||
for (uint64_t i = 0; i < DEFAULT_BUCKET_SIZE; ++i) {
|
for (uint64_t i = 0; i < DEFAULT_BUCKET_SIZE; ++i) {
|
||||||
void* tmp = taosArrayPush(pInfo->pTsBuckets, &dumy);
|
void* tmp = taosArrayPush(pInfo->pTsBuckets, &dumy);
|
||||||
if (!tmp) {
|
if (!tmp) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -195,7 +195,7 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
|
||||||
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
|
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
|
||||||
if (!pInfo->pMap) {
|
if (!pInfo->pMap) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
pInfo->maxDataVersion = 0;
|
pInfo->maxDataVersion = 0;
|
||||||
|
@ -255,7 +255,7 @@ static int32_t getSBf(SUpdateInfo* pInfo, TSKEY ts, SScalableBf** ppSBf) {
|
||||||
|
|
||||||
void* tmp = taosArrayPush(pInfo->pTsSBFs, &res);
|
void* tmp = taosArrayPush(pInfo->pTsSBFs, &res);
|
||||||
if (!tmp) {
|
if (!tmp) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -575,7 +575,7 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
|
||||||
if (tDecodeI64(&decoder, &ts) < 0) return -1;
|
if (tDecodeI64(&decoder, &ts) < 0) return -1;
|
||||||
void* tmp = taosArrayPush(pInfo->pTsBuckets, &ts);
|
void* tmp = taosArrayPush(pInfo->pTsBuckets, &ts);
|
||||||
if (!tmp) {
|
if (!tmp) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -594,7 +594,7 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
|
||||||
|
|
||||||
void* tmp = taosArrayPush(pInfo->pTsSBFs, &pSBf);
|
void* tmp = taosArrayPush(pInfo->pTsSBFs, &pSBf);
|
||||||
if (!tmp) {
|
if (!tmp) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -476,13 +476,13 @@ SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
|
SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
|
||||||
if (!pPos) {
|
if (!pPos) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
}
|
}
|
||||||
|
|
||||||
pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen);
|
pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen);
|
||||||
if (!pPos->pKey) {
|
if (!pPos->pKey) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -699,7 +699,7 @@ void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, boo
|
||||||
int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64;
|
int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64;
|
||||||
char* buf = taosMemoryCalloc(1, len);
|
char* buf = taosMemoryCalloc(1, len);
|
||||||
if (!buf) {
|
if (!buf) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -367,7 +367,7 @@ int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) {
|
||||||
}
|
}
|
||||||
pHead = rpcMallocCont(contLen);
|
pHead = rpcMallocCont(contLen);
|
||||||
if (!pHead) {
|
if (!pHead) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
|
sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId);
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
@ -2965,7 +2965,7 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
|
||||||
SSyncLogReplMgr* oldLogReplMgrs = NULL;
|
SSyncLogReplMgr* oldLogReplMgrs = NULL;
|
||||||
int64_t length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
|
int64_t length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA);
|
||||||
oldLogReplMgrs = taosMemoryMalloc(length);
|
oldLogReplMgrs = taosMemoryMalloc(length);
|
||||||
if (NULL == oldLogReplMgrs) return TSDB_CODE_OUT_OF_MEMORY;
|
if (NULL == oldLogReplMgrs) return terrno;
|
||||||
memset(oldLogReplMgrs, 0, length);
|
memset(oldLogReplMgrs, 0, length);
|
||||||
|
|
||||||
for (int i = 0; i < oldtotalReplicaNum; i++) {
|
for (int i = 0; i < oldtotalReplicaNum; i++) {
|
||||||
|
|
|
@ -25,7 +25,7 @@ int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t l
|
||||||
pMsg->msgType = (timeoutType == SYNC_TIMEOUT_ELECTION) ? TDMT_SYNC_TIMEOUT_ELECTION : TDMT_SYNC_TIMEOUT;
|
pMsg->msgType = (timeoutType == SYNC_TIMEOUT_ELECTION) ? TDMT_SYNC_TIMEOUT_ELECTION : TDMT_SYNC_TIMEOUT;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncTimeout* pTimeout = pMsg->pCont;
|
SyncTimeout* pTimeout = pMsg->pCont;
|
||||||
|
@ -45,7 +45,7 @@ int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t
|
||||||
|
|
||||||
pMsg->pCont = rpcMallocCont(bytes);
|
pMsg->pCont = rpcMallocCont(bytes);
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
|
@ -69,7 +69,7 @@ int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry*
|
||||||
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncClientRequest* pClientRequest = pMsg->pCont;
|
SyncClientRequest* pClientRequest = pMsg->pCont;
|
||||||
|
@ -89,7 +89,7 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
|
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncRequestVote* pRequestVote = pMsg->pCont;
|
SyncRequestVote* pRequestVote = pMsg->pCont;
|
||||||
|
@ -105,7 +105,7 @@ int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
|
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncRequestVoteReply* pRequestVoteReply = pMsg->pCont;
|
SyncRequestVoteReply* pRequestVoteReply = pMsg->pCont;
|
||||||
|
@ -121,7 +121,7 @@ int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES;
|
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncAppendEntries* pAppendEntries = pMsg->pCont;
|
SyncAppendEntries* pAppendEntries = pMsg->pCont;
|
||||||
|
@ -138,7 +138,7 @@ int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
|
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncAppendEntriesReply* pAppendEntriesReply = pMsg->pCont;
|
SyncAppendEntriesReply* pAppendEntriesReply = pMsg->pCont;
|
||||||
|
@ -155,7 +155,7 @@ int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pE
|
||||||
pRpcMsg->contLen = bytes;
|
pRpcMsg->contLen = bytes;
|
||||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||||
if (pRpcMsg->pCont == NULL) {
|
if (pRpcMsg->pCont == NULL) {
|
||||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncAppendEntries* pMsg = pRpcMsg->pCont;
|
SyncAppendEntries* pMsg = pRpcMsg->pCont;
|
||||||
|
@ -181,7 +181,7 @@ int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_HEARTBEAT;
|
pMsg->msgType = TDMT_SYNC_HEARTBEAT;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncHeartbeat* pHeartbeat = pMsg->pCont;
|
SyncHeartbeat* pHeartbeat = pMsg->pCont;
|
||||||
|
@ -197,7 +197,7 @@ int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_HEARTBEAT_REPLY;
|
pMsg->msgType = TDMT_SYNC_HEARTBEAT_REPLY;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncHeartbeatReply* pHeartbeatReply = pMsg->pCont;
|
SyncHeartbeatReply* pHeartbeatReply = pMsg->pCont;
|
||||||
|
@ -213,7 +213,7 @@ int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_SNAPSHOT_SEND;
|
pMsg->msgType = TDMT_SYNC_SNAPSHOT_SEND;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncSnapshotSend* pSnapshotSend = pMsg->pCont;
|
SyncSnapshotSend* pSnapshotSend = pMsg->pCont;
|
||||||
|
@ -230,7 +230,7 @@ int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_SNAPSHOT_RSP;
|
pMsg->msgType = TDMT_SYNC_SNAPSHOT_RSP;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncSnapshotRsp* pPreSnapshotRsp = pMsg->pCont;
|
SyncSnapshotRsp* pPreSnapshotRsp = pMsg->pCont;
|
||||||
|
@ -246,7 +246,7 @@ int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_LEADER_TRANSFER;
|
pMsg->msgType = TDMT_SYNC_LEADER_TRANSFER;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncLeaderTransfer* pLeaderTransfer = pMsg->pCont;
|
SyncLeaderTransfer* pLeaderTransfer = pMsg->pCont;
|
||||||
|
@ -262,7 +262,7 @@ int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_LOCAL_CMD;
|
pMsg->msgType = TDMT_SYNC_LOCAL_CMD;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncLocalCmd* pLocalCmd = pMsg->pCont;
|
SyncLocalCmd* pLocalCmd = pMsg->pCont;
|
||||||
|
|
|
@ -1304,7 +1304,7 @@ int32_t syncNodeLogReplInit(SSyncNode* pNode) {
|
||||||
if (pNode->logReplMgrs[i] != NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
|
if (pNode->logReplMgrs[i] != NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
pNode->logReplMgrs[i] = syncLogReplCreate();
|
pNode->logReplMgrs[i] = syncLogReplCreate();
|
||||||
if (pNode->logReplMgrs[i] == NULL) {
|
if (pNode->logReplMgrs[i] == NULL) {
|
||||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_RETURN(terrno);
|
||||||
}
|
}
|
||||||
pNode->logReplMgrs[i]->peerId = i;
|
pNode->logReplMgrs[i]->peerId = i;
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,7 @@ static int32_t syncEncodeSyncCfg(const void *pObj, SJson *pJson) {
|
||||||
|
|
||||||
SJson *nodeInfo = tjsonCreateArray();
|
SJson *nodeInfo = tjsonCreateArray();
|
||||||
if (nodeInfo == NULL) {
|
if (nodeInfo == NULL) {
|
||||||
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_CHECK_EXIT(terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((code = tjsonAddItemToObject(pJson, "nodeInfo", nodeInfo)) < 0) {
|
if ((code = tjsonAddItemToObject(pJson, "nodeInfo", nodeInfo)) < 0) {
|
||||||
|
@ -60,7 +60,7 @@ static int32_t syncEncodeSyncCfg(const void *pObj, SJson *pJson) {
|
||||||
for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
|
for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
|
||||||
SJson *info = tjsonCreateObject();
|
SJson *info = tjsonCreateObject();
|
||||||
if (info == NULL) {
|
if (info == NULL) {
|
||||||
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_CHECK_EXIT(terrno);
|
||||||
}
|
}
|
||||||
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(info, "nodePort", pCfg->nodeInfo[i].nodePort), NULL, _err);
|
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(info, "nodePort", pCfg->nodeInfo[i].nodePort), NULL, _err);
|
||||||
TAOS_CHECK_GOTO(tjsonAddStringToObject(info, "nodeFqdn", pCfg->nodeInfo[i].nodeFqdn), NULL, _err);
|
TAOS_CHECK_GOTO(tjsonAddStringToObject(info, "nodeFqdn", pCfg->nodeInfo[i].nodeFqdn), NULL, _err);
|
||||||
|
@ -97,7 +97,7 @@ static int32_t syncEncodeRaftCfg(const void *pObj, SJson *pJson) {
|
||||||
|
|
||||||
SJson *configIndexArr = tjsonCreateArray();
|
SJson *configIndexArr = tjsonCreateArray();
|
||||||
if (configIndexArr == NULL) {
|
if (configIndexArr == NULL) {
|
||||||
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_CHECK_EXIT(terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((code = tjsonAddItemToObject(pJson, "configIndexArr", configIndexArr)) < 0) {
|
if ((code = tjsonAddItemToObject(pJson, "configIndexArr", configIndexArr)) < 0) {
|
||||||
|
@ -108,7 +108,7 @@ static int32_t syncEncodeRaftCfg(const void *pObj, SJson *pJson) {
|
||||||
for (int32_t i = 0; i < pCfg->configIndexCount; ++i) {
|
for (int32_t i = 0; i < pCfg->configIndexCount; ++i) {
|
||||||
SJson *configIndex = tjsonCreateObject();
|
SJson *configIndex = tjsonCreateObject();
|
||||||
if (configIndex == NULL) {
|
if (configIndex == NULL) {
|
||||||
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_CHECK_EXIT(terrno);
|
||||||
}
|
}
|
||||||
TAOS_CHECK_EXIT(tjsonAddIntegerToObject(configIndex, "index", pCfg->configIndexArr[i]));
|
TAOS_CHECK_EXIT(tjsonAddIntegerToObject(configIndex, "index", pCfg->configIndexArr[i]));
|
||||||
TAOS_CHECK_EXIT(tjsonAddItemToArray(configIndexArr, configIndex));
|
TAOS_CHECK_EXIT(tjsonAddItemToArray(configIndexArr, configIndex));
|
||||||
|
@ -139,13 +139,13 @@ int32_t syncWriteCfgFile(SSyncNode *pNode) {
|
||||||
(void)snprintf(file, sizeof(file), "%s.bak", realfile);
|
(void)snprintf(file, sizeof(file), "%s.bak", realfile);
|
||||||
|
|
||||||
if ((pJson = tjsonCreateObject()) == NULL) {
|
if ((pJson = tjsonCreateObject()) == NULL) {
|
||||||
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_CHECK_EXIT(terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_CHECK_EXIT(tjsonAddObject(pJson, "RaftCfg", syncEncodeRaftCfg, pCfg));
|
TAOS_CHECK_EXIT(tjsonAddObject(pJson, "RaftCfg", syncEncodeRaftCfg, pCfg));
|
||||||
buffer = tjsonToString(pJson);
|
buffer = tjsonToString(pJson);
|
||||||
if (buffer == NULL) {
|
if (buffer == NULL) {
|
||||||
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_CHECK_EXIT(terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
|
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
|
||||||
|
@ -273,7 +273,7 @@ int32_t syncReadCfgFile(SSyncNode *pNode) {
|
||||||
|
|
||||||
pData = taosMemoryMalloc(size + 1);
|
pData = taosMemoryMalloc(size + 1);
|
||||||
if (pData == NULL) {
|
if (pData == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -107,7 +107,7 @@ int32_t syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
|
||||||
pRpcMsg->contLen = (int32_t)(pEntry->dataLen);
|
pRpcMsg->contLen = (int32_t)(pEntry->dataLen);
|
||||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||||
if (pRpcMsg->pCont == NULL) {
|
if (pRpcMsg->pCont == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
|
memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
|
||||||
|
|
||||||
|
|
|
@ -66,7 +66,7 @@ int32_t raftStoreReadFile(SSyncNode *pNode) {
|
||||||
|
|
||||||
pData = taosMemoryMalloc(size + 1);
|
pData = taosMemoryMalloc(size + 1);
|
||||||
if (pData == NULL) {
|
if (pData == NULL) {
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER);
|
TAOS_CHECK_GOTO(terrno, &lino, _OVER);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosReadFile(pFile, pData, size) != size) {
|
if (taosReadFile(pFile, pData, size) != size) {
|
||||||
|
@ -120,11 +120,11 @@ int32_t raftStoreWriteFile(SSyncNode *pNode) {
|
||||||
snprintf(file, sizeof(file), "%s.bak", realfile);
|
snprintf(file, sizeof(file), "%s.bak", realfile);
|
||||||
|
|
||||||
pJson = tjsonCreateObject();
|
pJson = tjsonCreateObject();
|
||||||
if (pJson == NULL) TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER);
|
if (pJson == NULL) TAOS_CHECK_GOTO(terrno, &lino, _OVER);
|
||||||
if (raftStoreEncode(pJson, pStore) != 0) TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER);
|
if (raftStoreEncode(pJson, pStore) != 0) TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER);
|
||||||
|
|
||||||
buffer = tjsonToString(pJson);
|
buffer = tjsonToString(pJson);
|
||||||
if (buffer == NULL) TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER);
|
if (buffer == NULL) TAOS_CHECK_GOTO(terrno, &lino, _OVER);
|
||||||
|
|
||||||
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
|
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
|
||||||
if (pFile == NULL) TAOS_CHECK_GOTO(terrno, &lino, _OVER);
|
if (pFile == NULL) TAOS_CHECK_GOTO(terrno, &lino, _OVER);
|
||||||
|
|
|
@ -32,7 +32,7 @@ int32_t syncRespMgrCreate(void *data, int64_t ttl, SSyncRespMgr **ppObj) {
|
||||||
taosHashInit(sizeof(uint64_t), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
taosHashInit(sizeof(uint64_t), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
if (pObj->pRespHash == NULL) {
|
if (pObj->pRespHash == NULL) {
|
||||||
taosMemoryFree(pObj);
|
taosMemoryFree(pObj);
|
||||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_RETURN(terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
pObj->ttl = ttl;
|
pObj->ttl = ttl;
|
||||||
|
@ -132,7 +132,7 @@ static int32_t syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
||||||
|
|
||||||
SArray *delIndexArray = taosArrayInit(4, sizeof(uint64_t));
|
SArray *delIndexArray = taosArrayInit(4, sizeof(uint64_t));
|
||||||
if (delIndexArray == NULL) {
|
if (delIndexArray == NULL) {
|
||||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_RETURN(terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
sDebug("vgId:%d, resp manager begin clean by ttl", pNode->vgId);
|
sDebug("vgId:%d, resp manager begin clean by ttl", pNode->vgId);
|
||||||
|
|
|
@ -720,7 +720,7 @@ static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshot
|
||||||
SSnapshotParam *pParam = &pReceiver->snapshotParam;
|
SSnapshotParam *pParam = &pReceiver->snapshotParam;
|
||||||
data = taosMemoryRealloc(pParam->data, dataLen);
|
data = taosMemoryRealloc(pParam->data, dataLen);
|
||||||
if (data == NULL) {
|
if (data == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
sError("vgId:%d, failed to realloc memory for snapshot prep due to %s. dataLen:%d", pSyncNode->vgId,
|
sError("vgId:%d, failed to realloc memory for snapshot prep due to %s. dataLen:%d", pSyncNode->vgId,
|
||||||
tstrerror(code), dataLen);
|
tstrerror(code), dataLen);
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
@ -1094,7 +1094,7 @@ static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSe
|
||||||
SSnapshotParam *pParam = &pSender->snapshotParam;
|
SSnapshotParam *pParam = &pSender->snapshotParam;
|
||||||
void *data = taosMemoryRealloc(pParam->data, dataLen);
|
void *data = taosMemoryRealloc(pParam->data, dataLen);
|
||||||
if (data == NULL) {
|
if (data == NULL) {
|
||||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_RETURN(terrno);
|
||||||
}
|
}
|
||||||
(void)memcpy(data, pMsg->data, dataLen);
|
(void)memcpy(data, pMsg->data, dataLen);
|
||||||
|
|
||||||
|
|
|
@ -499,7 +499,7 @@ void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl
|
||||||
int32_t syncSnapInfoDataRealloc(SSnapshot* pSnap, int32_t size) {
|
int32_t syncSnapInfoDataRealloc(SSnapshot* pSnap, int32_t size) {
|
||||||
void* data = taosMemoryRealloc(pSnap->data, size);
|
void* data = taosMemoryRealloc(pSnap->data, size);
|
||||||
if (data == NULL) {
|
if (data == NULL) {
|
||||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
pSnap->data = data;
|
pSnap->data = data;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -79,7 +79,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
|
||||||
|
|
||||||
pBt = (SBTree *)tdbOsCalloc(1, sizeof(*pBt));
|
pBt = (SBTree *)tdbOsCalloc(1, sizeof(*pBt));
|
||||||
if (pBt == NULL) {
|
if (pBt == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
// pBt->keyLen
|
// pBt->keyLen
|
||||||
|
|
|
@ -58,14 +58,14 @@ int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache) {
|
||||||
|
|
||||||
pCache = (SPCache *)tdbOsCalloc(1, sizeof(*pCache) + sizeof(SPage *) * cacheSize);
|
pCache = (SPCache *)tdbOsCalloc(1, sizeof(*pCache) + sizeof(SPage *) * cacheSize);
|
||||||
if (pCache == NULL) {
|
if (pCache == NULL) {
|
||||||
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
TSDB_CHECK_CODE(code = terrno, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
pCache->szPage = pageSize;
|
pCache->szPage = pageSize;
|
||||||
pCache->nPages = cacheSize;
|
pCache->nPages = cacheSize;
|
||||||
pCache->aPage = (SPage **)tdbOsCalloc(cacheSize, sizeof(SPage *));
|
pCache->aPage = (SPage **)tdbOsCalloc(cacheSize, sizeof(SPage *));
|
||||||
if (pCache->aPage == NULL) {
|
if (pCache->aPage == NULL) {
|
||||||
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
TSDB_CHECK_CODE(code = terrno, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tdbPCacheOpenImpl(pCache);
|
code = tdbPCacheOpenImpl(pCache);
|
||||||
|
@ -105,7 +105,7 @@ static int tdbPCacheAlterImpl(SPCache *pCache, int32_t nPage) {
|
||||||
} else if (pCache->nPages < nPage) {
|
} else if (pCache->nPages < nPage) {
|
||||||
SPage **aPage = tdbOsCalloc(nPage, sizeof(SPage *));
|
SPage **aPage = tdbOsCalloc(nPage, sizeof(SPage *));
|
||||||
if (aPage == NULL) {
|
if (aPage == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t iPage = pCache->nPages; iPage < nPage; iPage++) {
|
for (int32_t iPage = pCache->nPages; iPage < nPage; iPage++) {
|
||||||
|
@ -502,7 +502,7 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
|
||||||
pCache->nHash = pCache->nPages < 8 ? 8 : pCache->nPages;
|
pCache->nHash = pCache->nPages < 8 ? 8 : pCache->nPages;
|
||||||
pCache->pgHash = (SPage **)tdbOsCalloc(pCache->nHash, sizeof(SPage *));
|
pCache->pgHash = (SPage **)tdbOsCalloc(pCache->nHash, sizeof(SPage *));
|
||||||
if (pCache->pgHash == NULL) {
|
if (pCache->pgHash == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open LRU list
|
// Open LRU list
|
||||||
|
|
|
@ -322,7 +322,7 @@ int32_t tdbPageCopy(SPage *pFromPage, SPage *pToPage, int deepCopyOvfl) {
|
||||||
pNewCell = (SCell *)tdbOsMalloc(szCell);
|
pNewCell = (SCell *)tdbOsMalloc(szCell);
|
||||||
if (pNewCell == NULL) {
|
if (pNewCell == NULL) {
|
||||||
tdbError("tdb/page-copy: out of memory, size: %d", szCell);
|
tdbError("tdb/page-copy: out of memory, size: %d", szCell);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
memcpy(pNewCell, pFromPage->apOvfl[iOvfl], szCell);
|
memcpy(pNewCell, pFromPage->apOvfl[iOvfl], szCell);
|
||||||
tdbTrace("tdbPage/copy/new ovfl cell: %p/%p/%p", pNewCell, pToPage, pFromPage);
|
tdbTrace("tdbPage/copy/new ovfl cell: %p/%p/%p", pNewCell, pToPage, pFromPage);
|
||||||
|
@ -524,7 +524,7 @@ static int tdbPageDefragment(SPage *pPage) {
|
||||||
|
|
||||||
SCellIdx *aCellIdx = (SCellIdx *)tdbOsMalloc(sizeof(SCellIdx) * nCell);
|
SCellIdx *aCellIdx = (SCellIdx *)tdbOsMalloc(sizeof(SCellIdx) * nCell);
|
||||||
if (aCellIdx == NULL) {
|
if (aCellIdx == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
for (int32_t iCell = 0; iCell < nCell; iCell++) {
|
for (int32_t iCell = 0; iCell < nCell; iCell++) {
|
||||||
aCellIdx[iCell].iCell = iCell;
|
aCellIdx[iCell].iCell = iCell;
|
||||||
|
|
|
@ -167,7 +167,7 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
|
||||||
+ fsize + 8 + 1; /* jFileName */
|
+ fsize + 8 + 1; /* jFileName */
|
||||||
pPtr = (uint8_t *)tdbOsCalloc(1, zsize);
|
pPtr = (uint8_t *)tdbOsCalloc(1, zsize);
|
||||||
if (pPtr == NULL) {
|
if (pPtr == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
pPager = (SPager *)pPtr;
|
pPager = (SPager *)pPtr;
|
||||||
|
@ -508,7 +508,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
|
||||||
|
|
||||||
u8 *pageBuf = tdbOsCalloc(1, pPager->pageSize);
|
u8 *pageBuf = tdbOsCalloc(1, pPager->pageSize);
|
||||||
if (pageBuf == NULL) {
|
if (pageBuf == NULL) {
|
||||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
tdbDebug("pager/abort: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);
|
tdbDebug("pager/abort: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);
|
||||||
|
@ -741,7 +741,7 @@ int tdbPagerInsertFreePage(SPager *pPager, SPage *pPage, TXN *pTxn) {
|
||||||
|
|
||||||
if (pPager->frps) {
|
if (pPager->frps) {
|
||||||
if (taosArrayPush(pPager->frps, &pgno) == NULL) {
|
if (taosArrayPush(pPager->frps, &pgno) == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
pPage->pPager = NULL;
|
pPage->pPager = NULL;
|
||||||
return code;
|
return code;
|
||||||
|
@ -749,7 +749,7 @@ int tdbPagerInsertFreePage(SPager *pPager, SPage *pPage, TXN *pTxn) {
|
||||||
|
|
||||||
pPager->frps = taosArrayInit(8, sizeof(SPgno));
|
pPager->frps = taosArrayInit(8, sizeof(SPgno));
|
||||||
if (pPager->frps == NULL) {
|
if (pPager->frps == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
// memset(pPage->pData, 0, pPage->pageSize);
|
// memset(pPage->pData, 0, pPage->pageSize);
|
||||||
tdbTrace("tdb/insert-free-page: tbc recycle page: %d.", pgno);
|
tdbTrace("tdb/insert-free-page: tbc recycle page: %d.", pgno);
|
||||||
|
@ -1056,7 +1056,7 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) {
|
||||||
|
|
||||||
pageBuf = tdbOsCalloc(1, pPager->pageSize);
|
pageBuf = tdbOsCalloc(1, pPager->pageSize);
|
||||||
if (pageBuf == NULL) {
|
if (pageBuf == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
tdbDebug("pager/restore: %p, %d/%d, txnId:%s", pPager, pPager->dbOrigSize, pPager->dbFileSize, jFileName);
|
tdbDebug("pager/restore: %p, %d/%d, txnId:%s", pPager, pPager->dbOrigSize, pPager->dbFileSize, jFileName);
|
||||||
|
@ -1141,7 +1141,7 @@ int tdbPagerRestoreJournals(SPager *pPager) {
|
||||||
|
|
||||||
SArray *pTxnList = taosArrayInit(16, sizeof(int64_t));
|
SArray *pTxnList = taosArrayInit(16, sizeof(int64_t));
|
||||||
if (pTxnList == NULL) {
|
if (pTxnList == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
while ((pDirEntry = tdbReadDir(pDir)) != NULL) {
|
while ((pDirEntry = tdbReadDir(pDir)) != NULL) {
|
||||||
|
@ -1150,7 +1150,7 @@ int tdbPagerRestoreJournals(SPager *pPager) {
|
||||||
int64_t txnId = -1;
|
int64_t txnId = -1;
|
||||||
(void)sscanf(name, TDB_MAINDB_NAME "-journal.%" PRId64, &txnId);
|
(void)sscanf(name, TDB_MAINDB_NAME "-journal.%" PRId64, &txnId);
|
||||||
if (taosArrayPush(pTxnList, &txnId) == NULL) {
|
if (taosArrayPush(pTxnList, &txnId) == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,7 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
|
||||||
|
|
||||||
pTb = (TTB *)tdbOsCalloc(1, sizeof(*pTb));
|
pTb = (TTB *)tdbOsCalloc(1, sizeof(*pTb));
|
||||||
if (pTb == NULL) {
|
if (pTb == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
// pTb->pEnv
|
// pTb->pEnv
|
||||||
|
@ -221,7 +221,7 @@ int tdbTbcOpen(TTB *pTb, TBC **ppTbc, TXN *pTxn) {
|
||||||
*ppTbc = NULL;
|
*ppTbc = NULL;
|
||||||
pTbc = (TBC *)tdbOsMalloc(sizeof(*pTbc));
|
pTbc = (TBC *)tdbOsMalloc(sizeof(*pTbc));
|
||||||
if (pTbc == NULL) {
|
if (pTbc == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((ret = tdbBtcOpen(&pTbc->btc, pTb->pBt, pTxn)) != 0) {
|
if ((ret = tdbBtcOpen(&pTbc->btc, pTb->pBt, pTxn)) != 0) {
|
||||||
|
|
|
@ -50,7 +50,7 @@ int32_t tfsOpen(SDiskCfg *pCfg, int32_t ndisk, STfs **ppTfs) {
|
||||||
|
|
||||||
pTfs->hash = taosHashInit(TFS_MAX_DISKS * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
pTfs->hash = taosHashInit(TFS_MAX_DISKS * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
if (pTfs->hash == NULL) {
|
if (pTfs->hash == NULL) {
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
TAOS_CHECK_GOTO(terrno, &lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t idisk = 0; idisk < ndisk; idisk++) {
|
for (int32_t idisk = 0; idisk < ndisk; idisk++) {
|
||||||
|
@ -284,7 +284,7 @@ int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId) {
|
||||||
if (errno == ENOENT) {
|
if (errno == ENOENT) {
|
||||||
// Try to create upper
|
// Try to create upper
|
||||||
if ((s = taosStrdup(rname)) == NULL) {
|
if ((s = taosStrdup(rname)) == NULL) {
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
TAOS_CHECK_GOTO(terrno, &lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make a copy of dirname(s) because the implementation of 'dirname' differs on different platforms.
|
// Make a copy of dirname(s) because the implementation of 'dirname' differs on different platforms.
|
||||||
|
@ -693,7 +693,7 @@ static STfsDisk *tfsNextDisk(STfs *pTfs, SDiskIter *pIter) {
|
||||||
int32_t tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo) {
|
int32_t tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo) {
|
||||||
pInfo->datadirs = taosArrayInit(32, sizeof(SMonDiskDesc));
|
pInfo->datadirs = taosArrayInit(32, sizeof(SMonDiskDesc));
|
||||||
if (pInfo->datadirs == NULL) {
|
if (pInfo->datadirs == NULL) {
|
||||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_RETURN(terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
tfsUpdateSize(pTfs);
|
tfsUpdateSize(pTfs);
|
||||||
|
@ -711,7 +711,7 @@ int32_t tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo) {
|
||||||
TAOS_UNUSED(tfsUnLock(pTfs));
|
TAOS_UNUSED(tfsUnLock(pTfs));
|
||||||
taosArrayDestroy(pInfo->datadirs);
|
taosArrayDestroy(pInfo->datadirs);
|
||||||
pInfo->datadirs = NULL;
|
pInfo->datadirs = NULL;
|
||||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
TAOS_RETURN(terrno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,7 @@ int32_t tfsNewDisk(int32_t level, int32_t id, int8_t disable, const char *path,
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pDisk->path = taosStrdup(path)) == NULL) {
|
if ((pDisk->path = taosStrdup(path)) == NULL) {
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
TAOS_CHECK_GOTO(terrno, &lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
pDisk->level = level;
|
pDisk->level = level;
|
||||||
|
|
Loading…
Reference in New Issue