Merge pull request #26688 from taosdata/enh/TD-29367-10
enh: refact morecode
This commit is contained in:
commit
6f3d3a4097
|
@ -556,6 +556,7 @@ int32_t taosGetErrSize();
|
|||
#define TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER TAOS_DEF_ERROR_CODE(0, 0x061B)
|
||||
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x061C)
|
||||
#define TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE TAOS_DEF_ERROR_CODE(0, 0x061D)
|
||||
#define TSDB_CODE_TDB_INCONSISTENT_DB_ID TAOS_DEF_ERROR_CODE(0, 0x061E)
|
||||
|
||||
// query
|
||||
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
|
||||
|
|
|
@ -190,10 +190,10 @@ static int32_t vnodeAsyncCancelAllTasks(SVAsync *async, SArray *cancelArray) {
|
|||
task->prev->next = task->next;
|
||||
task->next->prev = task->prev;
|
||||
if (task->cancel) {
|
||||
taosArrayPush(cancelArray, &(SVATaskCancelInfo){
|
||||
.cancel = task->cancel,
|
||||
.arg = task->arg,
|
||||
});
|
||||
TAOS_UNUSED(taosArrayPush(cancelArray, &(SVATaskCancelInfo){
|
||||
.cancel = task->cancel,
|
||||
.arg = task->arg,
|
||||
}));
|
||||
}
|
||||
vnodeAsyncTaskDone(async, task);
|
||||
}
|
||||
|
@ -206,6 +206,9 @@ static void *vnodeAsyncLoop(void *arg) {
|
|||
SVWorker *worker = (SVWorker *)arg;
|
||||
SVAsync *async = worker->async;
|
||||
SArray *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo));
|
||||
if (cancelArray == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
setThreadName(async->label);
|
||||
|
||||
|
@ -466,7 +469,7 @@ int32_t vnodeAsyncOpen(int32_t numOfThreads) {
|
|||
vnodeAsyncSetWorkers(2, numOfThreads);
|
||||
|
||||
_exit:
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t vnodeAsyncClose() {
|
||||
|
@ -748,10 +751,10 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
|
|||
task->prev->next = task->next;
|
||||
task->next->prev = task->prev;
|
||||
if (task->cancel) {
|
||||
taosArrayPush(cancelArray, &(SVATaskCancelInfo){
|
||||
.cancel = task->cancel,
|
||||
.arg = task->arg,
|
||||
});
|
||||
TAOS_UNUSED(taosArrayPush(cancelArray, &(SVATaskCancelInfo){
|
||||
.cancel = task->cancel,
|
||||
.arg = task->arg,
|
||||
}));
|
||||
}
|
||||
vnodeAsyncTaskDone(async, task);
|
||||
}
|
||||
|
@ -763,10 +766,10 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
|
|||
channel->scheduled->prev->next = channel->scheduled->next;
|
||||
channel->scheduled->next->prev = channel->scheduled->prev;
|
||||
if (channel->scheduled->cancel) {
|
||||
taosArrayPush(cancelArray, &(SVATaskCancelInfo){
|
||||
.cancel = channel->scheduled->cancel,
|
||||
.arg = channel->scheduled->arg,
|
||||
});
|
||||
TAOS_UNUSED(taosArrayPush(cancelArray, &(SVATaskCancelInfo){
|
||||
.cancel = channel->scheduled->cancel,
|
||||
.arg = channel->scheduled->arg,
|
||||
}));
|
||||
}
|
||||
vnodeAsyncTaskDone(async, channel->scheduled);
|
||||
}
|
||||
|
|
|
@ -16,13 +16,12 @@
|
|||
#include "vnd.h"
|
||||
|
||||
/* ------------------------ STRUCTURES ------------------------ */
|
||||
static int vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPool **ppPool) {
|
||||
static int32_t vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPool **ppPool) {
|
||||
SVBufPool *pPool;
|
||||
|
||||
pPool = taosMemoryMalloc(sizeof(SVBufPool) + size);
|
||||
if (pPool == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
memset(pPool, 0, sizeof(SVBufPool));
|
||||
|
||||
|
@ -44,14 +43,12 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPoo
|
|||
pPool->lock = taosMemoryMalloc(sizeof(TdThreadSpinlock));
|
||||
if (!pPool->lock) {
|
||||
taosMemoryFree(pPool);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (taosThreadSpinInit(pPool->lock, 0) != 0) {
|
||||
taosMemoryFree((void *)pPool->lock);
|
||||
taosMemoryFree(pPool);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
return terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
} else {
|
||||
pPool->lock = NULL;
|
||||
|
@ -77,10 +74,11 @@ int vnodeOpenBufPool(SVnode *pVnode) {
|
|||
|
||||
for (int i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) {
|
||||
// create pool
|
||||
if (vnodeBufPoolCreate(pVnode, i, size, &pVnode->aBufPool[i])) {
|
||||
int32_t code;
|
||||
if ((code = vnodeBufPoolCreate(pVnode, i, size, &pVnode->aBufPool[i]))) {
|
||||
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
vnodeCloseBufPool(pVnode);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
// add to free list
|
||||
|
@ -274,8 +272,6 @@ _exit:
|
|||
}
|
||||
|
||||
int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) {
|
||||
int32_t code = 0;
|
||||
|
||||
taosThreadMutexLock(&pPool->mutex);
|
||||
|
||||
pQNode->pNext = pPool->qList.pNext;
|
||||
|
@ -285,9 +281,7 @@ int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) {
|
|||
pPool->nQuery++;
|
||||
|
||||
taosThreadMutexUnlock(&pPool->mutex);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode, bool proactive) {
|
||||
|
|
|
@ -394,8 +394,7 @@ int vnodeValidateTableHash(SVnode *pVnode, char *tableFName) {
|
|||
}
|
||||
|
||||
if (hashValue < pVnode->config.hashBegin || hashValue > pVnode->config.hashEnd) {
|
||||
terrno = TSDB_CODE_VND_HASH_MISMATCH;
|
||||
return -1;
|
||||
return terrno = TSDB_CODE_VND_HASH_MISMATCH;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -129,17 +129,17 @@ void initTqAPI(SStoreTqReader* pTq) {
|
|||
pTq->tqReaderIsQueriedTable = tqReaderIsQueriedTable;
|
||||
pTq->tqReaderCurrentBlockConsumed = tqCurrentBlockConsumed;
|
||||
|
||||
pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it
|
||||
// pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it
|
||||
pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it
|
||||
// pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it
|
||||
|
||||
pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it
|
||||
pTq->tqGetResultBlock = tqGetResultBlock;
|
||||
|
||||
// pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
|
||||
// pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
|
||||
pTq->tqGetResultBlockTime = tqGetResultBlockTime;
|
||||
|
||||
pTq->tqGetStreamExecProgress = tqGetStreamExecInfo;
|
||||
}
|
||||
}
|
||||
|
||||
void initStateStoreAPI(SStateStore* pStore) {
|
||||
pStore->streamFileStateInit = streamFileStateInit;
|
||||
|
|
|
@ -19,31 +19,19 @@
|
|||
static volatile int32_t VINIT = 0;
|
||||
|
||||
int vnodeInit(int nthreads) {
|
||||
int32_t init;
|
||||
|
||||
init = atomic_val_compare_exchange_32(&VINIT, 0, 1);
|
||||
if (init) {
|
||||
if (atomic_val_compare_exchange_32(&VINIT, 0, 1)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (vnodeAsyncOpen(nthreads) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (walInit() < 0) {
|
||||
return -1;
|
||||
}
|
||||
TAOS_CHECK_RETURN(vnodeAsyncOpen(nthreads));
|
||||
TAOS_CHECK_RETURN(walInit());
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void vnodeCleanup() {
|
||||
int32_t init = atomic_val_compare_exchange_32(&VINIT, 1, 0);
|
||||
if (init == 0) return;
|
||||
|
||||
// set stop
|
||||
if (atomic_val_compare_exchange_32(&VINIT, 1, 0) == 0) return;
|
||||
vnodeAsyncClose();
|
||||
|
||||
walCleanUp();
|
||||
smaCleanUp();
|
||||
}
|
||||
|
|
|
@ -435,6 +435,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_STB_NOT_EXIST, "Stable not exists")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER, "Table schema is old")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE, "Table already exists in other stables")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INCONSISTENT_DB_ID, "Inconsistent database id")
|
||||
|
||||
// query
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")
|
||||
|
|
|
@ -49,8 +49,7 @@ int32_t tjsonAddIntegerToObject(SJson* pJson, const char* pName, const uint64_t
|
|||
|
||||
int32_t tjsonAddDoubleToObject(SJson* pJson, const char* pName, const double number) {
|
||||
if (NULL == cJSON_AddNumberToObject((cJSON*)pJson, pName, number)) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -58,8 +57,7 @@ int32_t tjsonAddDoubleToObject(SJson* pJson, const char* pName, const double num
|
|||
|
||||
int32_t tjsonAddBoolToObject(SJson* pJson, const char* pName, const bool boolean) {
|
||||
if (NULL == cJSON_AddBoolToObject((cJSON*)pJson, pName, boolean)) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -67,8 +65,7 @@ int32_t tjsonAddBoolToObject(SJson* pJson, const char* pName, const bool boolean
|
|||
|
||||
int32_t tjsonAddStringToObject(SJson* pJson, const char* pName, const char* pVal) {
|
||||
if (NULL == cJSON_AddStringToObject((cJSON*)pJson, pName, pVal)) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -87,8 +84,7 @@ int32_t tjsonAddItemToObject(SJson* pJson, const char* pName, SJson* pItem) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t tjsonAddItemToArray(SJson* pJson, SJson* pItem) {
|
||||
|
@ -96,8 +92,7 @@ int32_t tjsonAddItemToArray(SJson* pJson, SJson* pItem) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t tjsonAddObject(SJson* pJson, const char* pName, FToJson func, const void* pObj) {
|
||||
|
@ -106,18 +101,27 @@ int32_t tjsonAddObject(SJson* pJson, const char* pName, FToJson func, const void
|
|||
}
|
||||
|
||||
SJson* pJobj = tjsonCreateObject();
|
||||
if (NULL == pJobj || TSDB_CODE_SUCCESS != func(pObj, pJobj)) {
|
||||
if (NULL == pJobj) {
|
||||
return terrno;
|
||||
}
|
||||
int32_t rc = func(pObj, pJobj);
|
||||
if (rc != TSDB_CODE_SUCCESS) {
|
||||
tjsonDelete(pJobj);
|
||||
return TSDB_CODE_FAILED;
|
||||
return rc;
|
||||
}
|
||||
return tjsonAddItemToObject(pJson, pName, pJobj);
|
||||
}
|
||||
|
||||
int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj) {
|
||||
SJson* pJobj = tjsonCreateObject();
|
||||
if (NULL == pJobj || TSDB_CODE_SUCCESS != func(pObj, pJobj)) {
|
||||
if (pJobj == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t rc = func(pObj, pJobj);
|
||||
if (rc != TSDB_CODE_SUCCESS) {
|
||||
tjsonDelete(pJobj);
|
||||
return TSDB_CODE_FAILED;
|
||||
return rc;
|
||||
}
|
||||
return tjsonAddItemToArray(pJson, pJobj);
|
||||
}
|
||||
|
@ -156,9 +160,21 @@ int32_t tjsonAddTArray(SJson* pJson, const char* pName, FToJson func, const SArr
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
char* tjsonToString(const SJson* pJson) { return cJSON_Print((cJSON*)pJson); }
|
||||
char* tjsonToString(const SJson* pJson) {
|
||||
char* p = cJSON_Print((cJSON*)pJson);
|
||||
if (!p) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
return p;
|
||||
}
|
||||
|
||||
char* tjsonToUnformattedString(const SJson* pJson) { return cJSON_PrintUnformatted((cJSON*)pJson); }
|
||||
char* tjsonToUnformattedString(const SJson* pJson) {
|
||||
char* p = cJSON_PrintUnformatted((cJSON*)pJson);
|
||||
if (!p) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
return p;
|
||||
}
|
||||
|
||||
SJson* tjsonGetObjectItem(const SJson* pJson, const char* pName) { return cJSON_GetObjectItem(pJson, pName); }
|
||||
|
||||
|
|
Loading…
Reference in New Issue