[Dnode] Table driven method to init and cleanup dnode components.
This patch refactors the initialization and cleanup path of dnode with a table driven method. This fixes the following issues: 1. Before the patch, if dnodeInitRead() fails, the cleanup path also runs dnodeCleanupWrite(), which will free wWorkerPool.writeWorker that's never allocated. (The code before this patch will not crash though, because wWorkerPool is zero-initialized global variable and therefore the accidental free will be a nop). 2. In general the order of calling cleanup function should be reverse to the order of calling init function, but this is not the case prior to this patch (see dnodeCleanupMnode() and dnodeCleanupMgmt()). * Bonus fix This patch also fixes a missing free for readPool.readWorker. * Testing I plan to run the test script ./test-all.sh, but was not able to do so. Is there a reference somewhere I can look up?
This commit is contained in:
parent
0d09f6c0b6
commit
e061343664
|
@ -22,7 +22,7 @@ extern "C" {
|
|||
|
||||
int32_t dnodeInitModules();
|
||||
void dnodeStartModules();
|
||||
void dnodeCleanUpModules();
|
||||
void dnodeCleanupModules();
|
||||
void dnodeProcessModuleStatus(uint32_t moduleStatus);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -41,6 +41,25 @@ static void dnodeSetRunStatus(SDnodeRunStatus status);
|
|||
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context);
|
||||
static void dnodeCheckDataDirOpenned(char *dir);
|
||||
static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED;
|
||||
static int32_t dnodeInitSteps();
|
||||
static void dnodeCleanupSteps(int32_t stepId);
|
||||
|
||||
typedef struct {
|
||||
const char *const name;
|
||||
int (*init)();
|
||||
void (*cleanup)();
|
||||
} DnodeStep;
|
||||
|
||||
static const DnodeStep DnodeSteps[] = {
|
||||
{"storage", dnodeInitStorage, dnodeCleanupStorage},
|
||||
{"read", dnodeInitRead, dnodeCleanupRead},
|
||||
{"write", dnodeInitWrite, dnodeCleanupWrite},
|
||||
{"mclient", dnodeInitMClient, dnodeCleanupMClient},
|
||||
{"modules", dnodeInitModules, dnodeCleanupModules},
|
||||
{"mnode", dnodeInitMnode, dnodeCleanupMnode},
|
||||
{"mgmt", dnodeInitMgmt, dnodeCleanupMgmt},
|
||||
{"shell", dnodeInitShell, dnodeCleanupShell},
|
||||
};
|
||||
|
||||
int32_t main(int32_t argc, char *argv[]) {
|
||||
// Set global configuration file
|
||||
|
@ -53,11 +72,11 @@ int32_t main(int32_t argc, char *argv[]) {
|
|||
exit(EXIT_FAILURE);
|
||||
}
|
||||
} else if (strcmp(argv[i], "-V") == 0) {
|
||||
#ifdef _SYNC
|
||||
#ifdef _SYNC
|
||||
char *versionStr = "enterprise";
|
||||
#else
|
||||
#else
|
||||
char *versionStr = "community";
|
||||
#endif
|
||||
#endif
|
||||
printf("%s version: %s compatible_version: %s\n", versionStr, version, compatible_version);
|
||||
printf("gitinfo: %s\n", gitinfo);
|
||||
printf("gitinfoI: %s\n", gitinfoOfInternal);
|
||||
|
@ -102,8 +121,6 @@ int32_t main(int32_t argc, char *argv[]) {
|
|||
if (dnodeInitSystem() < 0) {
|
||||
syslog(LOG_ERR, "Error initialize TDengine system");
|
||||
closelog();
|
||||
|
||||
dnodeCleanUpSystem();
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
|
@ -135,6 +152,24 @@ static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) {
|
|||
exit(EXIT_SUCCESS);
|
||||
}
|
||||
|
||||
static void dnodeCleanupSteps(int32_t stepId) {
|
||||
for (int32_t i = stepId; i >= 0; i--) {
|
||||
DnodeSteps[i].cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t dnodeInitSteps() {
|
||||
int32_t code = 0;
|
||||
for (int32_t i = 0; i < sizeof(DnodeSteps) / sizeof(DnodeSteps[0]); i++) {
|
||||
if (DnodeSteps[i].init() != 0) {
|
||||
dnodeCleanupSteps(i);
|
||||
code = -1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t dnodeInitSystem() {
|
||||
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE);
|
||||
tscEmbedded = 1;
|
||||
|
@ -164,14 +199,9 @@ static int32_t dnodeInitSystem() {
|
|||
|
||||
dPrint("start to initialize TDengine on %s", tsLocalEp);
|
||||
|
||||
if (dnodeInitStorage() != 0) return -1;
|
||||
if (dnodeInitRead() != 0) return -1;
|
||||
if (dnodeInitWrite() != 0) return -1;
|
||||
if (dnodeInitMClient() != 0) return -1;
|
||||
if (dnodeInitModules() != 0) return -1;
|
||||
if (dnodeInitMnode() != 0) return -1;
|
||||
if (dnodeInitMgmt() != 0) return -1;
|
||||
if (dnodeInitShell() != 0) return -1;
|
||||
if (dnodeInitSteps() != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
dnodeStartModules();
|
||||
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING);
|
||||
|
@ -184,16 +214,8 @@ static int32_t dnodeInitSystem() {
|
|||
static void dnodeCleanUpSystem() {
|
||||
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) {
|
||||
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED);
|
||||
dnodeCleanupShell();
|
||||
dnodeCleanupMnode();
|
||||
dnodeCleanupMgmt();
|
||||
dnodeCleanupMClient();
|
||||
dnodeCleanupWrite();
|
||||
dnodeCleanupRead();
|
||||
dnodeCleanUpModules();
|
||||
dnodeCleanupSteps(sizeof(DnodeSteps) / sizeof(DnodeSteps[0]) - 1);
|
||||
taos_cleanup();
|
||||
dnodeCleanupStorage();
|
||||
taosCloseLog();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -72,7 +72,7 @@ static void dnodeAllocModules() {
|
|||
}
|
||||
}
|
||||
|
||||
void dnodeCleanUpModules() {
|
||||
void dnodeCleanupModules() {
|
||||
for (int32_t module = 1; module < TSDB_MOD_MAX; ++module) {
|
||||
if (tsModule[module].enable && tsModule[module].stopFp) {
|
||||
(*tsModule[module].stopFp)();
|
||||
|
|
|
@ -34,7 +34,7 @@ typedef struct {
|
|||
} SReadMsg;
|
||||
|
||||
typedef struct {
|
||||
pthread_t thread; // thread
|
||||
pthread_t thread; // thread
|
||||
int32_t workerId; // worker ID
|
||||
} SReadWorker;
|
||||
|
||||
|
@ -74,10 +74,10 @@ void dnodeCleanupRead() {
|
|||
|
||||
for (int i=0; i < readPool.max; ++i) {
|
||||
SReadWorker *pWorker = readPool.readWorker + i;
|
||||
if (pWorker->thread)
|
||||
if (pWorker->thread)
|
||||
pthread_join(pWorker->thread, NULL);
|
||||
}
|
||||
|
||||
free(readPool.readWorker);
|
||||
taosCloseQset(readQset);
|
||||
dPrint("dnode read is closed");
|
||||
}
|
||||
|
@ -86,7 +86,7 @@ void dnodeRead(SRpcMsg *pMsg) {
|
|||
int32_t queuedMsgNum = 0;
|
||||
int32_t leftLen = pMsg->contLen;
|
||||
char *pCont = (char *) pMsg->pCont;
|
||||
void *pVnode;
|
||||
void *pVnode;
|
||||
|
||||
dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle);
|
||||
|
||||
|
@ -159,7 +159,7 @@ void *dnodeAllocateRqueue(void *pVnode) {
|
|||
} while (readPool.num < readPool.min);
|
||||
}
|
||||
|
||||
dTrace("pVnode:%p, read queue:%p is allocated", pVnode, queue);
|
||||
dTrace("pVnode:%p, read queue:%p is allocated", pVnode, queue);
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
@ -170,13 +170,13 @@ void dnodeFreeRqueue(void *rqueue) {
|
|||
// dynamically adjust the number of threads
|
||||
}
|
||||
|
||||
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
|
||||
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
|
||||
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
|
||||
pRead->rpcMsg = pMsg->rpcMsg;
|
||||
pRead->pCont = qhandle;
|
||||
pRead->contLen = 0;
|
||||
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
|
||||
|
||||
|
||||
taos_queue queue = vnodeGetRqueue(pVnode);
|
||||
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue