diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 9efa517ac3..8b10860ef0 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -39,16 +39,16 @@ #define cTrace(...) { if (cqDebugFlag & DEBUG_TRACE) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }} typedef struct { - int vgId; + int32_t vgId; char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; char db[TSDB_DB_NAME_LEN]; FCqWrite cqWrite; void *ahandle; - int num; // number of continuous streams + int32_t num; // number of continuous streams struct SCqObj *pHead; void *dbConn; - int master; + int32_t master; void *tmrCtrl; pthread_mutex_t mutex; } SCqContext; @@ -57,7 +57,7 @@ typedef struct SCqObj { tmr_h tmrId; uint64_t uid; int32_t tid; // table ID - int rowSize; // bytes of a row + int32_t rowSize; // bytes of a row char * sqlStr; // SQL string STSchema * pSchema; // pointer to schema array void * pStream; @@ -175,7 +175,7 @@ void cqStop(void *handle) { pthread_mutex_unlock(&pContext->mutex); } -void *cqCreate(void *handle, uint64_t uid, int tid, char *sqlStr, STSchema *pSchema) { +void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema *pSchema) { SCqContext *pContext = handle; SCqObj *pObj = calloc(sizeof(SCqObj), 1); @@ -237,7 +237,7 @@ void cqDrop(void *handle) { pthread_mutex_unlock(&pContext->mutex); } -static void doCreateStream(void *param, TAOS_RES *result, int code) { +static void doCreateStream(void *param, TAOS_RES *result, int32_t code) { SCqObj* pObj = (SCqObj*)param; SCqContext* pContext = pObj->pContext; SSqlObj* pSql = (SSqlObj*)result; @@ -288,7 +288,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); - int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize; + int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize; char *buffer = calloc(size, 1); SWalHead *pHead = (SWalHead *)buffer; diff --git a/src/inc/tcq.h b/src/inc/tcq.h index 4a23695a1a..7a0727f1b8 100644 --- a/src/inc/tcq.h +++ b/src/inc/tcq.h @@ -24,7 +24,7 @@ extern "C" { typedef int32_t (*FCqWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); typedef struct { - int vgId; + int32_t vgId; char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; char db[TSDB_DB_NAME_LEN]; @@ -42,12 +42,12 @@ void cqStart(void *handle); void cqStop(void *handle); // cqCreate is called by TSDB to start an instance of CQ -void *cqCreate(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema); +void *cqCreate(void *handle, uint64_t uid, int32_t sid, char *sqlStr, STSchema *pSchema); // cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate void cqDrop(void *handle); -extern int cqDebugFlag; +extern int32_t cqDebugFlag; #ifdef __cplusplus diff --git a/src/inc/twal.h b/src/inc/twal.h index 931cf5daba..c32bb87021 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -25,9 +25,15 @@ typedef enum { TAOS_WAL_FSYNC = 2 } EWalType; +typedef enum { + TAOS_WAL_NOT_KEEP = 0, + TAOS_WAL_KEEP = 1 +} EWalKeep; + typedef struct { int8_t msgType; - int8_t reserved[3]; + int8_t sver; + int8_t reserved[2]; int32_t len; uint64_t version; uint32_t signature; @@ -36,11 +42,10 @@ typedef struct { } SWalHead; typedef struct { - int32_t vgId; - int32_t fsyncPeriod; // millisecond - int8_t walLevel; // wal level - int8_t wals; // number of WAL files; - int8_t keep; // keep the wal file when closed + int32_t vgId; + int32_t fsyncPeriod; // millisecond + EWalType walLevel; // wal level + EWalKeep keep; // keep the wal file when closed } SWalCfg; typedef void * twalh; // WAL HANDLE @@ -58,7 +63,7 @@ int32_t walWrite(twalh, SWalHead *); void walFsync(twalh, bool forceFsync); int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp); int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId); -int64_t walGetVersion(twalh); +uint64_t walGetVersion(twalh); #ifdef __cplusplus } diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 8cd8de62ad..6b6a49db93 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -175,7 +175,7 @@ static void *sdbGetTableFromId(int32_t tableId) { } static int32_t sdbInitWal() { - SWalCfg walCfg = {.vgId = 1, .walLevel = 2, .wals = 2, .keep = 1, .fsyncPeriod = 0}; + SWalCfg walCfg = {.vgId = 1, .walLevel = TAOS_WAL_FSYNC, .keep = TAOS_WAL_KEEP, .fsyncPeriod = 0}; char temp[TSDB_FILENAME_LEN]; sprintf(temp, "%s/wal", tsMnodeDir); tsSdbObj.wal = walOpen(temp, &walCfg); diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 89ec277006..317d4904cb 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -41,8 +41,8 @@ typedef struct { int8_t status; int8_t role; int8_t accessState; - int64_t version; // current version - int64_t fversion; // version on saved data file + uint64_t version; // current version + uint64_t fversion; // version on saved data file void *wqueue; void *rqueue; void *wal; diff --git a/src/vnode/src/vnodeCfg.c b/src/vnode/src/vnodeCfg.c index e8dd44b48f..e2e57a7566 100644 --- a/src/vnode/src/vnodeCfg.c +++ b/src/vnode/src/vnodeCfg.c @@ -39,8 +39,7 @@ static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) { pVnode->tsdbCfg.compression = vnodeMsg->cfg.compression; pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel; pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod; - pVnode->walCfg.wals = vnodeMsg->cfg.wals; - pVnode->walCfg.keep = 0; + pVnode->walCfg.keep = TAOS_WAL_NOT_KEEP; pVnode->syncCfg.replica = vnodeMsg->cfg.replications; pVnode->syncCfg.quorum = vnodeMsg->cfg.quorum; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index f5b670f423..4cdf9f898a 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -227,6 +227,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->vgId = vnode; pVnode->status = TAOS_VN_STATUS_INIT; + pVnode->fversion = 0; pVnode->version = 0; pVnode->tsdbCfg.tsdbId = pVnode->vgId; pVnode->rootDir = strdup(rootDir); @@ -288,6 +289,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { vnodeCleanUp(pVnode); return terrno; } else { + pVnode->fversion = 0; pVnode->version = 0; } } @@ -302,6 +304,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { walRestore(pVnode->wal, pVnode, vnodeProcessWrite); if (pVnode->version == 0) { + pVnode->fversion = 0; pVnode->version = walGetVersion(pVnode->wal); } diff --git a/src/vnode/src/vnodeVersion.c b/src/vnode/src/vnodeVersion.c index 1d0695fb53..51f9ac8f3a 100644 --- a/src/vnode/src/vnodeVersion.c +++ b/src/vnode/src/vnodeVersion.c @@ -61,10 +61,10 @@ int32_t vnodeReadVersion(SVnodeObj *pVnode) { vError("vgId:%d, failed to read %s, version not found", pVnode->vgId, file); goto PARSE_VER_ERROR; } - pVnode->version = ver->valueint; + pVnode->version = (uint64_t)ver->valueint; terrno = TSDB_CODE_SUCCESS; - vInfo("vgId:%d, read %s successfully, version:%" PRId64, pVnode->vgId, file, pVnode->version); + vInfo("vgId:%d, read %s successfully, version:%" PRIu64, pVnode->vgId, file, pVnode->version); PARSE_VER_ERROR: if (content != NULL) free(content); @@ -89,7 +89,7 @@ int32_t vnodeSaveVersion(SVnodeObj *pVnode) { char * content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->fversion); + len += snprintf(content + len, maxLen - len, " \"version\": %" PRIu64 "\n", pVnode->fversion); len += snprintf(content + len, maxLen - len, "}\n"); fwrite(content, 1, len, fp); @@ -98,6 +98,6 @@ int32_t vnodeSaveVersion(SVnodeObj *pVnode) { free(content); terrno = 0; - vInfo("vgId:%d, successed to write %s, version:%" PRId64, pVnode->vgId, file, pVnode->fversion); + vInfo("vgId:%d, successed to write %s, version:%" PRIu64, pVnode->vgId, file, pVnode->fversion); return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index 2ae342244d..1f6a8f5546 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -128,7 +128,7 @@ void walClose(void *handle) { taosClose(pWal->fd); - if (!pWal->keep) { + if (pWal->keep != TAOS_WAL_KEEP) { int64_t fileId = -1; while (walGetNextFile(pWal, &fileId) >= 0) { snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 0d27ce1768..ecd0fd0ca0 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -41,7 +41,7 @@ int32_t walRenew(void *handle) { wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name); } - if (pWal->keep) { + if (pWal->keep == TAOS_WAL_KEEP) { pWal->fileId = 0; } else { if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0; @@ -58,7 +58,7 @@ int32_t walRenew(void *handle) { wDebug("vgId:%d, file:%s, it is created", pWal->vgId, pWal->name); } - if (!pWal->keep) { + if (pWal->keep != TAOS_WAL_KEEP) { // remove the oldest wal file int64_t oldFileId = -1; if (walGetOldFile(pWal, pWal->fileId, WAL_FILE_NUM, &oldFileId) == 0) { @@ -144,12 +144,12 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) { continue; } - wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName); + wDebug("vgId:%d, file:%s, restore success", pWal->vgId, walName); count++; } - if (!pWal->keep) return TSDB_CODE_SUCCESS; + if (pWal->keep != TAOS_WAL_KEEP) return TSDB_CODE_SUCCESS; if (count == 0) { wDebug("vgId:%d, wal file not exist, renew it", pWal->vgId); @@ -173,7 +173,6 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { if (handle == NULL) return -1; SWal *pWal = handle; - // for keep if (*fileId == 0) *fileId = -1; pthread_mutex_lock(&(pWal->mutex)); @@ -311,7 +310,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch return code; } -int64_t walGetVersion(twalh param) { +uint64_t walGetVersion(twalh param) { SWal *pWal = param; if (pWal == 0) return 0; diff --git a/src/wal/test/waltest.c b/src/wal/test/waltest.c index 14e439c072..7a473ed18c 100644 --- a/src/wal/test/waltest.c +++ b/src/wal/test/waltest.c @@ -37,7 +37,6 @@ int writeToQueue(void *pVnode, void *data, int type, void *pMsg) { int main(int argc, char *argv[]) { char path[128] = "/home/jhtao/test/wal"; - int max = 3; int level = 2; int total = 5; int rows = 10000; @@ -47,8 +46,6 @@ int main(int argc, char *argv[]) { for (int i=1; i