commit
ae67adfd23
|
@ -39,16 +39,16 @@
|
||||||
#define cTrace(...) { if (cqDebugFlag & DEBUG_TRACE) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }}
|
#define cTrace(...) { if (cqDebugFlag & DEBUG_TRACE) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int vgId;
|
int32_t vgId;
|
||||||
char user[TSDB_USER_LEN];
|
char user[TSDB_USER_LEN];
|
||||||
char pass[TSDB_PASSWORD_LEN];
|
char pass[TSDB_PASSWORD_LEN];
|
||||||
char db[TSDB_DB_NAME_LEN];
|
char db[TSDB_DB_NAME_LEN];
|
||||||
FCqWrite cqWrite;
|
FCqWrite cqWrite;
|
||||||
void *ahandle;
|
void *ahandle;
|
||||||
int num; // number of continuous streams
|
int32_t num; // number of continuous streams
|
||||||
struct SCqObj *pHead;
|
struct SCqObj *pHead;
|
||||||
void *dbConn;
|
void *dbConn;
|
||||||
int master;
|
int32_t master;
|
||||||
void *tmrCtrl;
|
void *tmrCtrl;
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
} SCqContext;
|
} SCqContext;
|
||||||
|
@ -57,7 +57,7 @@ typedef struct SCqObj {
|
||||||
tmr_h tmrId;
|
tmr_h tmrId;
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
int32_t tid; // table ID
|
int32_t tid; // table ID
|
||||||
int rowSize; // bytes of a row
|
int32_t rowSize; // bytes of a row
|
||||||
char * sqlStr; // SQL string
|
char * sqlStr; // SQL string
|
||||||
STSchema * pSchema; // pointer to schema array
|
STSchema * pSchema; // pointer to schema array
|
||||||
void * pStream;
|
void * pStream;
|
||||||
|
@ -175,7 +175,7 @@ void cqStop(void *handle) {
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
pthread_mutex_unlock(&pContext->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *cqCreate(void *handle, uint64_t uid, int tid, char *sqlStr, STSchema *pSchema) {
|
void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema *pSchema) {
|
||||||
SCqContext *pContext = handle;
|
SCqContext *pContext = handle;
|
||||||
|
|
||||||
SCqObj *pObj = calloc(sizeof(SCqObj), 1);
|
SCqObj *pObj = calloc(sizeof(SCqObj), 1);
|
||||||
|
@ -237,7 +237,7 @@ void cqDrop(void *handle) {
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
pthread_mutex_unlock(&pContext->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doCreateStream(void *param, TAOS_RES *result, int code) {
|
static void doCreateStream(void *param, TAOS_RES *result, int32_t code) {
|
||||||
SCqObj* pObj = (SCqObj*)param;
|
SCqObj* pObj = (SCqObj*)param;
|
||||||
SCqContext* pContext = pObj->pContext;
|
SCqContext* pContext = pObj->pContext;
|
||||||
SSqlObj* pSql = (SSqlObj*)result;
|
SSqlObj* pSql = (SSqlObj*)result;
|
||||||
|
@ -288,7 +288,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
|
||||||
|
|
||||||
cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
|
cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||||
|
|
||||||
int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize;
|
int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize;
|
||||||
char *buffer = calloc(size, 1);
|
char *buffer = calloc(size, 1);
|
||||||
|
|
||||||
SWalHead *pHead = (SWalHead *)buffer;
|
SWalHead *pHead = (SWalHead *)buffer;
|
||||||
|
|
|
@ -24,7 +24,7 @@ extern "C" {
|
||||||
typedef int32_t (*FCqWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
|
typedef int32_t (*FCqWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int vgId;
|
int32_t vgId;
|
||||||
char user[TSDB_USER_LEN];
|
char user[TSDB_USER_LEN];
|
||||||
char pass[TSDB_PASSWORD_LEN];
|
char pass[TSDB_PASSWORD_LEN];
|
||||||
char db[TSDB_DB_NAME_LEN];
|
char db[TSDB_DB_NAME_LEN];
|
||||||
|
@ -42,12 +42,12 @@ void cqStart(void *handle);
|
||||||
void cqStop(void *handle);
|
void cqStop(void *handle);
|
||||||
|
|
||||||
// cqCreate is called by TSDB to start an instance of CQ
|
// cqCreate is called by TSDB to start an instance of CQ
|
||||||
void *cqCreate(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema);
|
void *cqCreate(void *handle, uint64_t uid, int32_t sid, char *sqlStr, STSchema *pSchema);
|
||||||
|
|
||||||
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
|
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
|
||||||
void cqDrop(void *handle);
|
void cqDrop(void *handle);
|
||||||
|
|
||||||
extern int cqDebugFlag;
|
extern int32_t cqDebugFlag;
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -25,9 +25,15 @@ typedef enum {
|
||||||
TAOS_WAL_FSYNC = 2
|
TAOS_WAL_FSYNC = 2
|
||||||
} EWalType;
|
} EWalType;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
TAOS_WAL_NOT_KEEP = 0,
|
||||||
|
TAOS_WAL_KEEP = 1
|
||||||
|
} EWalKeep;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t msgType;
|
int8_t msgType;
|
||||||
int8_t reserved[3];
|
int8_t sver;
|
||||||
|
int8_t reserved[2];
|
||||||
int32_t len;
|
int32_t len;
|
||||||
uint64_t version;
|
uint64_t version;
|
||||||
uint32_t signature;
|
uint32_t signature;
|
||||||
|
@ -36,11 +42,10 @@ typedef struct {
|
||||||
} SWalHead;
|
} SWalHead;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int32_t fsyncPeriod; // millisecond
|
int32_t fsyncPeriod; // millisecond
|
||||||
int8_t walLevel; // wal level
|
EWalType walLevel; // wal level
|
||||||
int8_t wals; // number of WAL files;
|
EWalKeep keep; // keep the wal file when closed
|
||||||
int8_t keep; // keep the wal file when closed
|
|
||||||
} SWalCfg;
|
} SWalCfg;
|
||||||
|
|
||||||
typedef void * twalh; // WAL HANDLE
|
typedef void * twalh; // WAL HANDLE
|
||||||
|
@ -58,7 +63,7 @@ int32_t walWrite(twalh, SWalHead *);
|
||||||
void walFsync(twalh, bool forceFsync);
|
void walFsync(twalh, bool forceFsync);
|
||||||
int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
|
int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
|
||||||
int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
|
int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
|
||||||
int64_t walGetVersion(twalh);
|
uint64_t walGetVersion(twalh);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -175,7 +175,7 @@ static void *sdbGetTableFromId(int32_t tableId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sdbInitWal() {
|
static int32_t sdbInitWal() {
|
||||||
SWalCfg walCfg = {.vgId = 1, .walLevel = 2, .wals = 2, .keep = 1, .fsyncPeriod = 0};
|
SWalCfg walCfg = {.vgId = 1, .walLevel = TAOS_WAL_FSYNC, .keep = TAOS_WAL_KEEP, .fsyncPeriod = 0};
|
||||||
char temp[TSDB_FILENAME_LEN];
|
char temp[TSDB_FILENAME_LEN];
|
||||||
sprintf(temp, "%s/wal", tsMnodeDir);
|
sprintf(temp, "%s/wal", tsMnodeDir);
|
||||||
tsSdbObj.wal = walOpen(temp, &walCfg);
|
tsSdbObj.wal = walOpen(temp, &walCfg);
|
||||||
|
|
|
@ -41,8 +41,8 @@ typedef struct {
|
||||||
int8_t status;
|
int8_t status;
|
||||||
int8_t role;
|
int8_t role;
|
||||||
int8_t accessState;
|
int8_t accessState;
|
||||||
int64_t version; // current version
|
uint64_t version; // current version
|
||||||
int64_t fversion; // version on saved data file
|
uint64_t fversion; // version on saved data file
|
||||||
void *wqueue;
|
void *wqueue;
|
||||||
void *rqueue;
|
void *rqueue;
|
||||||
void *wal;
|
void *wal;
|
||||||
|
|
|
@ -39,8 +39,7 @@ static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) {
|
||||||
pVnode->tsdbCfg.compression = vnodeMsg->cfg.compression;
|
pVnode->tsdbCfg.compression = vnodeMsg->cfg.compression;
|
||||||
pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel;
|
pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel;
|
||||||
pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod;
|
pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod;
|
||||||
pVnode->walCfg.wals = vnodeMsg->cfg.wals;
|
pVnode->walCfg.keep = TAOS_WAL_NOT_KEEP;
|
||||||
pVnode->walCfg.keep = 0;
|
|
||||||
pVnode->syncCfg.replica = vnodeMsg->cfg.replications;
|
pVnode->syncCfg.replica = vnodeMsg->cfg.replications;
|
||||||
pVnode->syncCfg.quorum = vnodeMsg->cfg.quorum;
|
pVnode->syncCfg.quorum = vnodeMsg->cfg.quorum;
|
||||||
|
|
||||||
|
|
|
@ -227,6 +227,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
|
|
||||||
pVnode->vgId = vnode;
|
pVnode->vgId = vnode;
|
||||||
pVnode->status = TAOS_VN_STATUS_INIT;
|
pVnode->status = TAOS_VN_STATUS_INIT;
|
||||||
|
pVnode->fversion = 0;
|
||||||
pVnode->version = 0;
|
pVnode->version = 0;
|
||||||
pVnode->tsdbCfg.tsdbId = pVnode->vgId;
|
pVnode->tsdbCfg.tsdbId = pVnode->vgId;
|
||||||
pVnode->rootDir = strdup(rootDir);
|
pVnode->rootDir = strdup(rootDir);
|
||||||
|
@ -288,6 +289,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
vnodeCleanUp(pVnode);
|
vnodeCleanUp(pVnode);
|
||||||
return terrno;
|
return terrno;
|
||||||
} else {
|
} else {
|
||||||
|
pVnode->fversion = 0;
|
||||||
pVnode->version = 0;
|
pVnode->version = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -302,6 +304,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
|
|
||||||
walRestore(pVnode->wal, pVnode, vnodeProcessWrite);
|
walRestore(pVnode->wal, pVnode, vnodeProcessWrite);
|
||||||
if (pVnode->version == 0) {
|
if (pVnode->version == 0) {
|
||||||
|
pVnode->fversion = 0;
|
||||||
pVnode->version = walGetVersion(pVnode->wal);
|
pVnode->version = walGetVersion(pVnode->wal);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -61,10 +61,10 @@ int32_t vnodeReadVersion(SVnodeObj *pVnode) {
|
||||||
vError("vgId:%d, failed to read %s, version not found", pVnode->vgId, file);
|
vError("vgId:%d, failed to read %s, version not found", pVnode->vgId, file);
|
||||||
goto PARSE_VER_ERROR;
|
goto PARSE_VER_ERROR;
|
||||||
}
|
}
|
||||||
pVnode->version = ver->valueint;
|
pVnode->version = (uint64_t)ver->valueint;
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
vInfo("vgId:%d, read %s successfully, version:%" PRId64, pVnode->vgId, file, pVnode->version);
|
vInfo("vgId:%d, read %s successfully, version:%" PRIu64, pVnode->vgId, file, pVnode->version);
|
||||||
|
|
||||||
PARSE_VER_ERROR:
|
PARSE_VER_ERROR:
|
||||||
if (content != NULL) free(content);
|
if (content != NULL) free(content);
|
||||||
|
@ -89,7 +89,7 @@ int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
|
||||||
char * content = calloc(1, maxLen + 1);
|
char * content = calloc(1, maxLen + 1);
|
||||||
|
|
||||||
len += snprintf(content + len, maxLen - len, "{\n");
|
len += snprintf(content + len, maxLen - len, "{\n");
|
||||||
len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->fversion);
|
len += snprintf(content + len, maxLen - len, " \"version\": %" PRIu64 "\n", pVnode->fversion);
|
||||||
len += snprintf(content + len, maxLen - len, "}\n");
|
len += snprintf(content + len, maxLen - len, "}\n");
|
||||||
|
|
||||||
fwrite(content, 1, len, fp);
|
fwrite(content, 1, len, fp);
|
||||||
|
@ -98,6 +98,6 @@ int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
|
||||||
free(content);
|
free(content);
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
vInfo("vgId:%d, successed to write %s, version:%" PRId64, pVnode->vgId, file, pVnode->fversion);
|
vInfo("vgId:%d, successed to write %s, version:%" PRIu64, pVnode->vgId, file, pVnode->fversion);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
|
@ -128,7 +128,7 @@ void walClose(void *handle) {
|
||||||
|
|
||||||
taosClose(pWal->fd);
|
taosClose(pWal->fd);
|
||||||
|
|
||||||
if (!pWal->keep) {
|
if (pWal->keep != TAOS_WAL_KEEP) {
|
||||||
int64_t fileId = -1;
|
int64_t fileId = -1;
|
||||||
while (walGetNextFile(pWal, &fileId) >= 0) {
|
while (walGetNextFile(pWal, &fileId) >= 0) {
|
||||||
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
|
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
|
||||||
|
|
|
@ -41,7 +41,7 @@ int32_t walRenew(void *handle) {
|
||||||
wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name);
|
wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pWal->keep) {
|
if (pWal->keep == TAOS_WAL_KEEP) {
|
||||||
pWal->fileId = 0;
|
pWal->fileId = 0;
|
||||||
} else {
|
} else {
|
||||||
if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0;
|
if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0;
|
||||||
|
@ -58,7 +58,7 @@ int32_t walRenew(void *handle) {
|
||||||
wDebug("vgId:%d, file:%s, it is created", pWal->vgId, pWal->name);
|
wDebug("vgId:%d, file:%s, it is created", pWal->vgId, pWal->name);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pWal->keep) {
|
if (pWal->keep != TAOS_WAL_KEEP) {
|
||||||
// remove the oldest wal file
|
// remove the oldest wal file
|
||||||
int64_t oldFileId = -1;
|
int64_t oldFileId = -1;
|
||||||
if (walGetOldFile(pWal, pWal->fileId, WAL_FILE_NUM, &oldFileId) == 0) {
|
if (walGetOldFile(pWal, pWal->fileId, WAL_FILE_NUM, &oldFileId) == 0) {
|
||||||
|
@ -144,12 +144,12 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName);
|
wDebug("vgId:%d, file:%s, restore success", pWal->vgId, walName);
|
||||||
|
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pWal->keep) return TSDB_CODE_SUCCESS;
|
if (pWal->keep != TAOS_WAL_KEEP) return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
if (count == 0) {
|
if (count == 0) {
|
||||||
wDebug("vgId:%d, wal file not exist, renew it", pWal->vgId);
|
wDebug("vgId:%d, wal file not exist, renew it", pWal->vgId);
|
||||||
|
@ -173,7 +173,6 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
|
||||||
if (handle == NULL) return -1;
|
if (handle == NULL) return -1;
|
||||||
SWal *pWal = handle;
|
SWal *pWal = handle;
|
||||||
|
|
||||||
// for keep
|
|
||||||
if (*fileId == 0) *fileId = -1;
|
if (*fileId == 0) *fileId = -1;
|
||||||
|
|
||||||
pthread_mutex_lock(&(pWal->mutex));
|
pthread_mutex_lock(&(pWal->mutex));
|
||||||
|
@ -311,7 +310,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t walGetVersion(twalh param) {
|
uint64_t walGetVersion(twalh param) {
|
||||||
SWal *pWal = param;
|
SWal *pWal = param;
|
||||||
if (pWal == 0) return 0;
|
if (pWal == 0) return 0;
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,6 @@ int writeToQueue(void *pVnode, void *data, int type, void *pMsg) {
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) {
|
||||||
char path[128] = "/home/jhtao/test/wal";
|
char path[128] = "/home/jhtao/test/wal";
|
||||||
int max = 3;
|
|
||||||
int level = 2;
|
int level = 2;
|
||||||
int total = 5;
|
int total = 5;
|
||||||
int rows = 10000;
|
int rows = 10000;
|
||||||
|
@ -47,8 +46,6 @@ int main(int argc, char *argv[]) {
|
||||||
for (int i=1; i<argc; ++i) {
|
for (int i=1; i<argc; ++i) {
|
||||||
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
||||||
tstrncpy(path, argv[++i], sizeof(path));
|
tstrncpy(path, argv[++i], sizeof(path));
|
||||||
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
|
|
||||||
max = atoi(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-l")==0 && i < argc-1) {
|
} else if (strcmp(argv[i], "-l")==0 && i < argc-1) {
|
||||||
level = atoi(argv[++i]);
|
level = atoi(argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-r")==0 && i < argc-1) {
|
} else if (strcmp(argv[i], "-r")==0 && i < argc-1) {
|
||||||
|
@ -66,7 +63,6 @@ int main(int argc, char *argv[]) {
|
||||||
} else {
|
} else {
|
||||||
printf("\nusage: %s [options] \n", argv[0]);
|
printf("\nusage: %s [options] \n", argv[0]);
|
||||||
printf(" [-p path]: wal file path default is:%s\n", path);
|
printf(" [-p path]: wal file path default is:%s\n", path);
|
||||||
printf(" [-m max]: max wal files, default is:%d\n", max);
|
|
||||||
printf(" [-l level]: log level, default is:%d\n", level);
|
printf(" [-l level]: log level, default is:%d\n", level);
|
||||||
printf(" [-t total]: total wal files, default is:%d\n", total);
|
printf(" [-t total]: total wal files, default is:%d\n", total);
|
||||||
printf(" [-r rows]: rows of records per wal file, default is:%d\n", rows);
|
printf(" [-r rows]: rows of records per wal file, default is:%d\n", rows);
|
||||||
|
@ -82,7 +78,6 @@ int main(int argc, char *argv[]) {
|
||||||
|
|
||||||
SWalCfg walCfg;
|
SWalCfg walCfg;
|
||||||
walCfg.walLevel = level;
|
walCfg.walLevel = level;
|
||||||
walCfg.wals = max;
|
|
||||||
walCfg.keep = keep;
|
walCfg.keep = keep;
|
||||||
|
|
||||||
pWal = walOpen(path, &walCfg);
|
pWal = walOpen(path, &walCfg);
|
||||||
|
|
Loading…
Reference in New Issue