fix bug
This commit is contained in:
parent
3202b93234
commit
e2fc62e35c
|
@ -88,6 +88,7 @@ typedef struct STSBuf {
|
|||
STSList tsData; // uncompressed raw ts data
|
||||
uint64_t numOfTotal;
|
||||
bool autoDelete;
|
||||
bool remainOpen;
|
||||
int32_t tsOrder; // order of timestamp in ts comp buffer
|
||||
STSCursor cur;
|
||||
} STSBuf;
|
||||
|
|
|
@ -3836,8 +3836,10 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) {
|
|||
STSBuf * pTSbuf = pInfo->pTSBuf;
|
||||
|
||||
tsBufFlush(pTSbuf);
|
||||
strcpy(pCtx->aOutputBuf, pTSbuf->path);
|
||||
|
||||
*(FILE **)pCtx->aOutputBuf = pTSbuf->f;
|
||||
|
||||
pTSbuf->remainOpen = true;
|
||||
tsBufDestroy(pTSbuf);
|
||||
doFinalizer(pCtx);
|
||||
}
|
||||
|
|
|
@ -2010,6 +2010,7 @@ static void doFreeQueryHandle(SQInfo* pQInfo) {
|
|||
assert(pMemRef->ref == 0 && pMemRef->imem == NULL && pMemRef->mem == NULL);
|
||||
}
|
||||
|
||||
|
||||
static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
if (pRuntimeEnv->pQuery == NULL) {
|
||||
return;
|
||||
|
@ -2021,6 +2022,16 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
qDebug("QInfo:%p teardown runtime env", pQInfo);
|
||||
cleanupResultRowInfo(&pRuntimeEnv->windowResInfo);
|
||||
|
||||
if (isTSCompQuery(pQuery)) {
|
||||
FILE *f = *(FILE **)pQuery->sdata[0]->data;
|
||||
|
||||
if (f) {
|
||||
fclose(f);
|
||||
*(FILE **)pQuery->sdata[0]->data = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (pRuntimeEnv->pCtx != NULL) {
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
||||
|
@ -6949,10 +6960,10 @@ static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) {
|
|||
* TODO handle the case that the file is too large to send back one time
|
||||
*/
|
||||
if (isTSCompQuery(pQuery) && (*numOfRows) > 0) {
|
||||
struct stat fstat;
|
||||
if (stat(pQuery->sdata[0]->data, &fstat) == 0) {
|
||||
*numOfRows = fstat.st_size;
|
||||
return fstat.st_size;
|
||||
struct stat fStat;
|
||||
if (fstat(fileno(*(FILE **)pQuery->sdata[0]->data), &fStat) == 0) {
|
||||
*numOfRows = fStat.st_size;
|
||||
return fStat.st_size;
|
||||
} else {
|
||||
qError("QInfo:%p failed to get file info, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno));
|
||||
return 0;
|
||||
|
@ -6968,15 +6979,16 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
|
|||
|
||||
// load data from file to msg buffer
|
||||
if (isTSCompQuery(pQuery)) {
|
||||
int32_t fd = open(pQuery->sdata[0]->data, O_RDONLY, 0666);
|
||||
|
||||
FILE *f = *(FILE **)pQuery->sdata[0]->data;
|
||||
|
||||
// make sure file exist
|
||||
if (FD_VALID(fd)) {
|
||||
uint64_t s = lseek(fd, 0, SEEK_END);
|
||||
if (f) {
|
||||
off_t s = lseek(fileno(f), 0, SEEK_END);
|
||||
|
||||
qDebug("QInfo:%p ts comp data return, file:%s, size:%"PRId64, pQInfo, pQuery->sdata[0]->data, s);
|
||||
if (lseek(fd, 0, SEEK_SET) >= 0) {
|
||||
size_t sz = read(fd, data, (uint32_t) s);
|
||||
qDebug("QInfo:%p ts comp data return, file:%p, size:%"PRId64, pQInfo, f, s);
|
||||
if (fseek(f, 0, SEEK_SET) >= 0) {
|
||||
size_t sz = fread(data, 1, s, f);
|
||||
if(sz < s) { // todo handle error
|
||||
assert(0);
|
||||
}
|
||||
|
@ -6984,15 +6996,8 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
|
|||
UNUSED(s);
|
||||
}
|
||||
|
||||
close(fd);
|
||||
unlink(pQuery->sdata[0]->data);
|
||||
} else {
|
||||
// todo return the error code to client and handle invalid fd
|
||||
qError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo,
|
||||
pQuery->sdata[0]->data, strerror(errno));
|
||||
if (fd != -1) {
|
||||
close(fd);
|
||||
}
|
||||
fclose(f);
|
||||
*(FILE **)pQuery->sdata[0]->data = NULL;
|
||||
}
|
||||
|
||||
// all data returned, set query over
|
||||
|
|
|
@ -19,6 +19,8 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
|
|||
if (pTSBuf == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pTSBuf->autoDelete = autoDelete;
|
||||
|
||||
taosGetTmpfilePath("join", pTSBuf->path);
|
||||
pTSBuf->f = fopen(pTSBuf->path, "w+");
|
||||
|
@ -26,6 +28,10 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
|
|||
free(pTSBuf);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (!autoDelete) {
|
||||
unlink(pTSBuf->path);
|
||||
}
|
||||
|
||||
if (NULL == allocResForTSBuf(pTSBuf)) {
|
||||
return NULL;
|
||||
|
@ -37,8 +43,7 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
|
|||
|
||||
tsBufResetPos(pTSBuf);
|
||||
pTSBuf->cur.order = TSDB_ORDER_ASC;
|
||||
|
||||
pTSBuf->autoDelete = autoDelete;
|
||||
|
||||
pTSBuf->tsOrder = order;
|
||||
|
||||
return pTSBuf;
|
||||
|
@ -49,6 +54,8 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
|
|||
if (pTSBuf == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pTSBuf->autoDelete = autoDelete;
|
||||
|
||||
tstrncpy(pTSBuf->path, path, sizeof(pTSBuf->path));
|
||||
|
||||
|
@ -129,7 +136,6 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
|
|||
|
||||
// ascending by default
|
||||
pTSBuf->cur.order = TSDB_ORDER_ASC;
|
||||
pTSBuf->autoDelete = autoDelete;
|
||||
|
||||
// tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfGroups:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f),
|
||||
// pTSBuf->fileSize, pTSBuf->numOfGroups, pTSBuf->autoDelete);
|
||||
|
@ -147,8 +153,10 @@ void* tsBufDestroy(STSBuf* pTSBuf) {
|
|||
|
||||
tfree(pTSBuf->pData);
|
||||
tfree(pTSBuf->block.payload);
|
||||
|
||||
fclose(pTSBuf->f);
|
||||
|
||||
if (!pTSBuf->remainOpen) {
|
||||
fclose(pTSBuf->f);
|
||||
}
|
||||
|
||||
if (pTSBuf->autoDelete) {
|
||||
// ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path);
|
||||
|
|
Loading…
Reference in New Issue