diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 0579ae46bc..e2f9a58023 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -223,6 +223,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0406) //"Action in progress") #define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0407) //"Too many vnode directories") #define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0408) //"Dnode is exiting" +#define TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0409) //"Parse vnodes.json error") // vnode #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress") diff --git a/include/util/tdef.h b/include/util/tdef.h index 66e5f28bde..0ad0f68f3f 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -212,7 +212,7 @@ do { \ #define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth #define TSDB_CQ_SQL_SIZE 1024 #define TSDB_MIN_VNODES 64 -#define TSDB_MAX_VNODES 2048 +#define TSDB_MAX_VNODES 512 #define TSDB_MIN_VNODES_PER_DB 2 #define TSDB_MAX_VNODES_PER_DB 64 diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 24c56ea6a3..bcb9aea856 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -50,6 +50,7 @@ void *taosAllocateQitem(int size); void taosFreeQitem(void *pItem); int taosWriteQitem(taos_queue, void *pItem); int taosReadQitem(taos_queue, void **pItem); +bool taosQueueEmpty(taos_queue); taos_qall taosAllocateQall(); void taosFreeQall(taos_qall); diff --git a/source/dnode/mgmt/src/dnodeVnodes.c b/source/dnode/mgmt/src/dnodeVnodes.c index c7682539ef..8bf80ccff8 100644 --- a/source/dnode/mgmt/src/dnodeVnodes.c +++ b/source/dnode/mgmt/src/dnodeVnodes.c @@ -16,7 +16,9 @@ #define _DEFAULT_SOURCE #include "dnodeVnodes.h" #include "dnodeTransport.h" +#include "cJSON.h" #include "thash.h" +#include "tlockfree.h" #include "tqueue.h" #include "tstep.h" #include "tthread.h" @@ -56,7 +58,7 @@ static struct { SSteps *pSteps; int32_t openVnodes; int32_t totalVnodes; - char file[PATH_MAX + 20]; + SRWLatch latch; } tsVnodes; static int32_t dnodeAllocVnodeQueryQueue(SVnodeObj *pVnode); @@ -70,6 +72,28 @@ static void dnodeFreeVnodeApplyQueue(SVnodeObj *pVnode); static int32_t dnodeAllocVnodeSyncQueue(SVnodeObj *pVnode); static void dnodeFreeVnodeSyncQueue(SVnodeObj *pVnode); +static SVnodeObj *dnodeAcquireVnode(int32_t vgId) { + SVnodeObj *pVnode = NULL; + int32_t refCount = 0; + + taosRLockLatch(&tsVnodes.latch); + taosHashGetClone(tsVnodes.hash, &vgId, sizeof(int32_t), (void *)&pVnode); + if (pVnode == NULL) { + terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; + } else { + refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + } + taosRUnLockLatch(&tsVnodes.latch); + + dTrace("vgId:%d, accquire vnode, refCount:%d", pVnode->vgId, refCount); + return pVnode; +} + +static void dnodeReleaseVnode(SVnodeObj *pVnode) { + int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); + dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); +} + static int32_t dnodeCreateVnodeWrapper(int32_t vgId, SVnode *pImpl) { SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); if (pVnode == NULL) { @@ -107,13 +131,27 @@ static int32_t dnodeCreateVnodeWrapper(int32_t vgId, SVnode *pImpl) { return code; } - return taosHashPut(tsVnodes.hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); + taosWLockLatch(&tsVnodes.latch); + code = taosHashPut(tsVnodes.hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); + taosWUnLockLatch(&tsVnodes.latch); + + return code; } static void dnodeDropVnodeWrapper(SVnodeObj *pVnode) { + taosWLockLatch(&tsVnodes.latch); taosHashRemove(tsVnodes.hash, &pVnode->vgId, sizeof(int32_t)); + taosWUnLockLatch(&tsVnodes.latch); + + // wait all queue empty + dnodeReleaseVnode(pVnode); + while (pVnode->refCount > 0) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); - //todo wait all queue empty dnodeFreeVnodeQueryQueue(pVnode); dnodeFreeVnodeFetchQueue(pVnode); dnodeFreeVnodeWriteQueue(pVnode); @@ -121,35 +159,164 @@ static void dnodeDropVnodeWrapper(SVnodeObj *pVnode) { dnodeFreeVnodeSyncQueue(pVnode); } -static int32_t dnodeGetVnodesFromHash(SVnodeObj *pVnodes[], int32_t *numOfVnodes) { +static SVnodeObj **dnodeGetVnodesFromHash(int32_t *numOfVnodes) { + taosRLockLatch(&tsVnodes.latch); + + int32_t num = 0; + int32_t size = taosHashGetSize(tsVnodes.hash); + SVnodeObj **pVnodes = calloc(size, sizeof(SVnodeObj *)); + void *pIter = taosHashIterate(tsVnodes.hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; - if (*ppVnode) { - (*numOfVnodes)++; - if (*numOfVnodes >= TSDB_MAX_VNODES) { - dError("vgId:%d, too many open vnodes, exist:%d max:%d", (*ppVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES); - continue; - } else { - pVnodes[*numOfVnodes - 1] = (*ppVnode); + SVnodeObj *pVnode = *ppVnode; + if (pVnode) { + num++; + if (num < size) { + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + dTrace("vgId:%d, accquire vnode, refCount:%d", pVnode->vgId, refCount); + pVnodes[num] = (*ppVnode); } } - pIter = taosHashIterate(tsVnodes.hash, pIter); } - return TSDB_CODE_SUCCESS; + taosRUnLockLatch(&tsVnodes.latch); + *numOfVnodes = num; + + return pVnodes; } -static int32_t dnodeGetVnodesFromFile(SVnodeObj *pVnodes, int32_t *numOfVnodes) { - pVnodes[0].vgId = 2; - pVnodes[0].dropped = 0; - pVnodes[0].vgId = 3; - pVnodes[0].dropped = 0; - return 0; +static int32_t dnodeGetVnodesFromFile(SVnodeObj **ppVnodes, int32_t *numOfVnodes) { + int32_t code = TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR; + int32_t len = 0; + int32_t maxLen = 30000; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + FILE *fp = NULL; + char file[PATH_MAX + 20] = {0}; + SVnodeObj *pVnodes = NULL; + + snprintf(file, PATH_MAX + 20, "%s/vnodes.json", tsVnodeDir); + + fp = fopen(file, "r"); + if (!fp) { + dDebug("file %s not exist", file); + code = 0; + goto PRASE_VNODE_OVER; + } + + len = (int32_t)fread(content, 1, maxLen, fp); + if (len <= 0) { + dError("failed to read %s since content is null", file); + goto PRASE_VNODE_OVER; + } + + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read %s since invalid json format", file); + goto PRASE_VNODE_OVER; + } + + cJSON *vnodes = cJSON_GetObjectItem(root, "vnodes"); + if (!vnodes || vnodes->type != cJSON_Array) { + dError("failed to read %s since vnodes not found", file); + goto PRASE_VNODE_OVER; + } + + int32_t vnodesNum = cJSON_GetArraySize(vnodes); + if (vnodesNum <= 0) { + dError("failed to read %s since vnodes size:%d invalid", file, vnodesNum); + goto PRASE_VNODE_OVER; + } + + pVnodes = calloc(vnodesNum, sizeof(SVnodeObj)); + if (pVnodes == NULL) { + dError("failed to read %s since out of memory", file); + goto PRASE_VNODE_OVER; + } + + for (int32_t i = 0; i < vnodesNum; ++i) { + cJSON *vnode = cJSON_GetArrayItem(vnodes, i); + SVnodeObj *pVnode = &pVnodes[i]; + + cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId"); + if (!vgId || vgId->type != cJSON_String) { + dError("failed to read %s since vgId not found", file); + goto PRASE_VNODE_OVER; + } + pVnode->vgId = atoi(vgId->valuestring); + + cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped"); + if (!dropped || dropped->type != cJSON_String) { + dError("failed to read %s since dropped not found", file); + goto PRASE_VNODE_OVER; + } + pVnode->dropped = atoi(vnode->valuestring); + } + + code = 0; + dInfo("succcessed to read file %s", file); + +PRASE_VNODE_OVER: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (fp != NULL) fclose(fp); + + return code; } -static int32_t dnodeWriteVnodesToFile() { return 0; } +static int32_t dnodeWriteVnodesToFile() { + char file[PATH_MAX + 20] = {0}; + char realfile[PATH_MAX + 20] = {0}; + snprintf(file, PATH_MAX + 20, "%s/vnodes.json.bak", tsVnodeDir); + snprintf(realfile, PATH_MAX + 20, "%s/vnodes.json", tsVnodeDir); + + FILE *fp = fopen(file, "w"); + if (!fp) { + dError("failed to write %s since %s", file, strerror(errno)); + return -1; + } + + int32_t len = 0; + int32_t maxLen = 30000; + char *content = calloc(1, maxLen + 1); + int32_t numOfVnodes = 0; + SVnodeObj **pVnodes = dnodeGetVnodesFromHash(&numOfVnodes); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"vnodes\": [{\n"); + for (int32_t i = 0; i < numOfVnodes; ++i) { + SVnodeObj *pVnode = pVnodes[i]; + len += snprintf(content + len, maxLen - len, " \"vgId\": \"%d\",\n", pVnode->vgId); + len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\"\n", pVnode->dropped); + if (i < numOfVnodes - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }]\n"); + } + } + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + taosFsyncFile(fileno(fp)); + fclose(fp); + free(content); + terrno = 0; + + for (int32_t i = 0; i < numOfVnodes; ++i) { + SVnodeObj *pVnode = pVnodes[i]; + dnodeReleaseVnode(pVnode); + } + + if (pVnodes != NULL) { + free(pVnodes); + } + + dInfo("successed to write %s", file); + return taosRenameFile(file, realfile); +} static int32_t dnodeCreateVnode(int32_t vgId, SVnodeCfg *pCfg) { int32_t code = 0; @@ -193,24 +360,6 @@ static int32_t dnodeDropVnode(SVnodeObj *pVnode) { return 0; } -static SVnodeObj *dnodeAcquireVnode(int32_t vgId) { - SVnodeObj *pVnode = NULL; - - taosHashGetClone(tsVnodes.hash, &vgId, sizeof(int32_t), (void *)&pVnode); - if (pVnode == NULL) { - terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; - } - - int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); - dTrace("vgId:%d, accquire vnode, refCount:%d", pVnode->vgId, refCount); - return pVnode; -} - -static void dnodeReleaseVnode(SVnodeObj *pVnode) { - int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); - dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); -} - static void *dnodeOpenVnodeFunc(void *param) { SVThread *pThread = param; @@ -246,15 +395,17 @@ static void *dnodeOpenVnodeFunc(void *param) { } static int32_t dnodeOpenVnodes() { + taosInitRWLatch(&tsVnodes.latch); + tsVnodes.hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (tsVnodes.hash == NULL) { dError("failed to init vnode hash"); return TSDB_CODE_VND_OUT_OF_MEMORY; } - SVnodeObj pVnodes[TSDB_MAX_VNODES] = {0}; - int32_t numOfVnodes = 0; - int32_t code = dnodeGetVnodesFromFile(pVnodes, &numOfVnodes); + SVnodeObj *pVnodes = NULL; + int32_t numOfVnodes = 0; + int32_t code = dnodeGetVnodesFromFile(&pVnodes, &numOfVnodes); if (code != TSDB_CODE_SUCCESS) { dInfo("failed to get vnode list from disk since %s", tstrerror(code)); return code; @@ -308,17 +459,14 @@ static int32_t dnodeOpenVnodes() { } static void dnodeCloseVnodes() { - SVnodeObj *pVnodes[TSDB_MAX_VNODES] = {0}; - int32_t numOfVnodes = 0; - - int32_t code = dnodeGetVnodesFromHash(pVnodes, &numOfVnodes); - if (code != TSDB_CODE_SUCCESS) { - dInfo("failed to get dnode list since code %d", code); - return; - } + int32_t numOfVnodes = 0; + SVnodeObj **pVnodes = dnodeGetVnodesFromHash(&numOfVnodes); for (int32_t i = 0; i < numOfVnodes; ++i) { - vnodeClose(pVnodes[i]->pImpl); + dnodeDropVnodeWrapper(pVnodes[i]); + } + if (pVnodes != NULL) { + free(pVnodes); } if (tsVnodes.hash != NULL) { @@ -431,12 +579,12 @@ static int32_t vnodeProcessDropVnodeReq(SRpcMsg *rpcMsg) { return code; } - code = vnodeDrop(pVnode->pImpl); + code = dnodeDropVnode(pVnode); if (code != 0) { + dnodeReleaseVnode(pVnode); dError("vgId:%d, failed to drop vnode since %s", vgId, tstrerror(code)); } - dnodeReleaseVnode(pVnode); return code; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 22fbeb1883..6838bab403 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -235,6 +235,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, "Invalid message lengt TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_DND_TOO_MANY_VNODES, "Too many vnode directories") TAOS_DEFINE_ERROR(TSDB_CODE_DND_EXITING, "Dnode is exiting") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR, "Parse vnodes.json error") // vnode TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, "Action in progress") diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 2813a55fea..5d6a507172 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -98,6 +98,20 @@ void taosCloseQueue(taos_queue param) { uTrace("queue:%p is closed", queue); } +bool taosQueueEmpty(taos_queue param) { + if (param == NULL) return true; + STaosQueue *queue = (STaosQueue *)param; + + bool empty = false; + pthread_mutex_lock(&queue->mutex); + if (queue->head == NULL && queue->tail == NULL) { + empty = true; + } + pthread_mutex_destroy(&queue->mutex); + + return empty; +} + void *taosAllocateQitem(int size) { STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);