commit
8dc6056a2b
|
@ -414,6 +414,7 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheck
|
||||||
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||||
|
|
||||||
static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
|
static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
|
||||||
|
STsdbRepo *pRepo = pQueryHandle->pTsdb;
|
||||||
SCompData* data = calloc(1, sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols);
|
SCompData* data = calloc(1, sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols);
|
||||||
|
|
||||||
data->numOfCols = pBlock->numOfCols;
|
data->numOfCols = pBlock->numOfCols;
|
||||||
|
@ -423,7 +424,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
|
||||||
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
|
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
|
||||||
|
|
||||||
if (pCheckInfo->pDataCols == NULL) {
|
if (pCheckInfo->pDataCols == NULL) {
|
||||||
pCheckInfo->pDataCols = tdNewDataCols(1000, 100, 4096); //todo fix me
|
pCheckInfo->pDataCols = tdNewDataCols(pRepo->tsdbMeta->maxRowBytes, pRepo->tsdbMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj));
|
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj));
|
||||||
|
|
|
@ -49,6 +49,7 @@ typedef struct {
|
||||||
STsdbCfg tsdbCfg;
|
STsdbCfg tsdbCfg;
|
||||||
SSyncCfg syncCfg;
|
SSyncCfg syncCfg;
|
||||||
SWalCfg walCfg;
|
SWalCfg walCfg;
|
||||||
|
char * rootDir;
|
||||||
} SVnodeObj;
|
} SVnodeObj;
|
||||||
|
|
||||||
int vnodeWriteToQueue(void *param, void *pHead, int type);
|
int vnodeWriteToQueue(void *param, void *pHead, int type);
|
||||||
|
|
|
@ -182,6 +182,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
pVnode->refCount = 1;
|
pVnode->refCount = 1;
|
||||||
pVnode->version = 0;
|
pVnode->version = 0;
|
||||||
pVnode->tsdbCfg.tsdbId = pVnode->vgId;
|
pVnode->tsdbCfg.tsdbId = pVnode->vgId;
|
||||||
|
pVnode->rootDir = strdup(rootDir);
|
||||||
taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode));
|
taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode));
|
||||||
|
|
||||||
int32_t code = vnodeReadCfg(pVnode);
|
int32_t code = vnodeReadCfg(pVnode);
|
||||||
|
@ -271,6 +272,7 @@ void vnodeRelease(void *pVnodeRaw) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tfree(pVnode->rootDir);
|
||||||
// remove read queue
|
// remove read queue
|
||||||
dnodeFreeRqueue(pVnode->rqueue);
|
dnodeFreeRqueue(pVnode->rqueue);
|
||||||
pVnode->rqueue = NULL;
|
pVnode->rqueue = NULL;
|
||||||
|
@ -406,7 +408,15 @@ static void vnodeNotifyFileSynced(void *ahandle) {
|
||||||
SVnodeObj *pVnode = ahandle;
|
SVnodeObj *pVnode = ahandle;
|
||||||
vTrace("vgId:%d, data file is synced", pVnode->vgId);
|
vTrace("vgId:%d, data file is synced", pVnode->vgId);
|
||||||
|
|
||||||
// close tsdb, then open tsdb
|
char rootDir[128] = "\0";
|
||||||
|
sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
|
||||||
|
// clsoe tsdb, then open tsdb
|
||||||
|
tsdbCloseRepo(pVnode->tsdb);
|
||||||
|
STsdbAppH appH = {0};
|
||||||
|
appH.appH = (void *)pVnode;
|
||||||
|
appH.walCallBack = vnodeWalCallback;
|
||||||
|
appH.cqH = pVnode->cq;
|
||||||
|
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
|
|
Loading…
Reference in New Issue