Merge pull request #4845 from taosdata/hotfix/TD-2673
[TD-2673]<fix>:some tmp files are left in server's tmp directory
This commit is contained in:
commit
09e1aeaa2d
|
@ -88,6 +88,7 @@ typedef struct STSBuf {
|
||||||
STSList tsData; // uncompressed raw ts data
|
STSList tsData; // uncompressed raw ts data
|
||||||
uint64_t numOfTotal;
|
uint64_t numOfTotal;
|
||||||
bool autoDelete;
|
bool autoDelete;
|
||||||
|
bool remainOpen;
|
||||||
int32_t tsOrder; // order of timestamp in ts comp buffer
|
int32_t tsOrder; // order of timestamp in ts comp buffer
|
||||||
STSCursor cur;
|
STSCursor cur;
|
||||||
} STSBuf;
|
} STSBuf;
|
||||||
|
|
|
@ -3836,8 +3836,10 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) {
|
||||||
STSBuf * pTSbuf = pInfo->pTSBuf;
|
STSBuf * pTSbuf = pInfo->pTSBuf;
|
||||||
|
|
||||||
tsBufFlush(pTSbuf);
|
tsBufFlush(pTSbuf);
|
||||||
strcpy(pCtx->aOutputBuf, pTSbuf->path);
|
|
||||||
|
|
||||||
|
*(FILE **)pCtx->aOutputBuf = pTSbuf->f;
|
||||||
|
|
||||||
|
pTSbuf->remainOpen = true;
|
||||||
tsBufDestroy(pTSbuf);
|
tsBufDestroy(pTSbuf);
|
||||||
doFinalizer(pCtx);
|
doFinalizer(pCtx);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2010,6 +2010,7 @@ static void doFreeQueryHandle(SQInfo* pQInfo) {
|
||||||
assert(pMemRef->ref == 0 && pMemRef->imem == NULL && pMemRef->mem == NULL);
|
assert(pMemRef->ref == 0 && pMemRef->imem == NULL && pMemRef->mem == NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
|
static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
if (pRuntimeEnv->pQuery == NULL) {
|
if (pRuntimeEnv->pQuery == NULL) {
|
||||||
return;
|
return;
|
||||||
|
@ -2021,6 +2022,16 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
qDebug("QInfo:%p teardown runtime env", pQInfo);
|
qDebug("QInfo:%p teardown runtime env", pQInfo);
|
||||||
cleanupResultRowInfo(&pRuntimeEnv->windowResInfo);
|
cleanupResultRowInfo(&pRuntimeEnv->windowResInfo);
|
||||||
|
|
||||||
|
if (isTSCompQuery(pQuery)) {
|
||||||
|
FILE *f = *(FILE **)pQuery->sdata[0]->data;
|
||||||
|
|
||||||
|
if (f) {
|
||||||
|
fclose(f);
|
||||||
|
*(FILE **)pQuery->sdata[0]->data = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
if (pRuntimeEnv->pCtx != NULL) {
|
if (pRuntimeEnv->pCtx != NULL) {
|
||||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
||||||
|
@ -6949,10 +6960,10 @@ static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) {
|
||||||
* TODO handle the case that the file is too large to send back one time
|
* TODO handle the case that the file is too large to send back one time
|
||||||
*/
|
*/
|
||||||
if (isTSCompQuery(pQuery) && (*numOfRows) > 0) {
|
if (isTSCompQuery(pQuery) && (*numOfRows) > 0) {
|
||||||
struct stat fstat;
|
struct stat fStat;
|
||||||
if (stat(pQuery->sdata[0]->data, &fstat) == 0) {
|
if (fstat(fileno(*(FILE **)pQuery->sdata[0]->data), &fStat) == 0) {
|
||||||
*numOfRows = fstat.st_size;
|
*numOfRows = fStat.st_size;
|
||||||
return fstat.st_size;
|
return fStat.st_size;
|
||||||
} else {
|
} else {
|
||||||
qError("QInfo:%p failed to get file info, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno));
|
qError("QInfo:%p failed to get file info, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno));
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -6968,15 +6979,16 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
|
||||||
|
|
||||||
// load data from file to msg buffer
|
// load data from file to msg buffer
|
||||||
if (isTSCompQuery(pQuery)) {
|
if (isTSCompQuery(pQuery)) {
|
||||||
int32_t fd = open(pQuery->sdata[0]->data, O_RDONLY, 0666);
|
|
||||||
|
FILE *f = *(FILE **)pQuery->sdata[0]->data;
|
||||||
|
|
||||||
// make sure file exist
|
// make sure file exist
|
||||||
if (FD_VALID(fd)) {
|
if (f) {
|
||||||
uint64_t s = lseek(fd, 0, SEEK_END);
|
off_t s = lseek(fileno(f), 0, SEEK_END);
|
||||||
|
|
||||||
qDebug("QInfo:%p ts comp data return, file:%s, size:%"PRId64, pQInfo, pQuery->sdata[0]->data, s);
|
qDebug("QInfo:%p ts comp data return, file:%p, size:%"PRId64, pQInfo, f, s);
|
||||||
if (lseek(fd, 0, SEEK_SET) >= 0) {
|
if (fseek(f, 0, SEEK_SET) >= 0) {
|
||||||
size_t sz = read(fd, data, (uint32_t) s);
|
size_t sz = fread(data, 1, s, f);
|
||||||
if(sz < s) { // todo handle error
|
if(sz < s) { // todo handle error
|
||||||
assert(0);
|
assert(0);
|
||||||
}
|
}
|
||||||
|
@ -6984,15 +6996,8 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
|
||||||
UNUSED(s);
|
UNUSED(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
close(fd);
|
fclose(f);
|
||||||
unlink(pQuery->sdata[0]->data);
|
*(FILE **)pQuery->sdata[0]->data = NULL;
|
||||||
} else {
|
|
||||||
// todo return the error code to client and handle invalid fd
|
|
||||||
qError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo,
|
|
||||||
pQuery->sdata[0]->data, strerror(errno));
|
|
||||||
if (fd != -1) {
|
|
||||||
close(fd);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// all data returned, set query over
|
// all data returned, set query over
|
||||||
|
|
|
@ -19,6 +19,8 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
|
||||||
if (pTSBuf == NULL) {
|
if (pTSBuf == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTSBuf->autoDelete = autoDelete;
|
||||||
|
|
||||||
taosGetTmpfilePath("join", pTSBuf->path);
|
taosGetTmpfilePath("join", pTSBuf->path);
|
||||||
pTSBuf->f = fopen(pTSBuf->path, "w+");
|
pTSBuf->f = fopen(pTSBuf->path, "w+");
|
||||||
|
@ -26,6 +28,10 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
|
||||||
free(pTSBuf);
|
free(pTSBuf);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!autoDelete) {
|
||||||
|
unlink(pTSBuf->path);
|
||||||
|
}
|
||||||
|
|
||||||
if (NULL == allocResForTSBuf(pTSBuf)) {
|
if (NULL == allocResForTSBuf(pTSBuf)) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -37,8 +43,7 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
|
||||||
|
|
||||||
tsBufResetPos(pTSBuf);
|
tsBufResetPos(pTSBuf);
|
||||||
pTSBuf->cur.order = TSDB_ORDER_ASC;
|
pTSBuf->cur.order = TSDB_ORDER_ASC;
|
||||||
|
|
||||||
pTSBuf->autoDelete = autoDelete;
|
|
||||||
pTSBuf->tsOrder = order;
|
pTSBuf->tsOrder = order;
|
||||||
|
|
||||||
return pTSBuf;
|
return pTSBuf;
|
||||||
|
@ -49,6 +54,8 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
|
||||||
if (pTSBuf == NULL) {
|
if (pTSBuf == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTSBuf->autoDelete = autoDelete;
|
||||||
|
|
||||||
tstrncpy(pTSBuf->path, path, sizeof(pTSBuf->path));
|
tstrncpy(pTSBuf->path, path, sizeof(pTSBuf->path));
|
||||||
|
|
||||||
|
@ -129,7 +136,6 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
|
||||||
|
|
||||||
// ascending by default
|
// ascending by default
|
||||||
pTSBuf->cur.order = TSDB_ORDER_ASC;
|
pTSBuf->cur.order = TSDB_ORDER_ASC;
|
||||||
pTSBuf->autoDelete = autoDelete;
|
|
||||||
|
|
||||||
// tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfGroups:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f),
|
// tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfGroups:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f),
|
||||||
// pTSBuf->fileSize, pTSBuf->numOfGroups, pTSBuf->autoDelete);
|
// pTSBuf->fileSize, pTSBuf->numOfGroups, pTSBuf->autoDelete);
|
||||||
|
@ -147,8 +153,10 @@ void* tsBufDestroy(STSBuf* pTSBuf) {
|
||||||
|
|
||||||
tfree(pTSBuf->pData);
|
tfree(pTSBuf->pData);
|
||||||
tfree(pTSBuf->block.payload);
|
tfree(pTSBuf->block.payload);
|
||||||
|
|
||||||
fclose(pTSBuf->f);
|
if (!pTSBuf->remainOpen) {
|
||||||
|
fclose(pTSBuf->f);
|
||||||
|
}
|
||||||
|
|
||||||
if (pTSBuf->autoDelete) {
|
if (pTSBuf->autoDelete) {
|
||||||
// ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path);
|
// ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path);
|
||||||
|
|
Loading…
Reference in New Issue