Skip to content

Commit

Permalink
Remove listeners and ensure memory cleanup in CSV processing (#300)
Browse files Browse the repository at this point in the history
In the item creation and CSV processing methods in server controllers, code enhancements were applied to improve memory management. Listeners on csvStream are now explicitly removed upon completion or error to clean up resources. This change also assures that the temporary buffer for processing CSV monitoring data is wiped when the procedure ends.
  • Loading branch information
ludeknovy authored Mar 1, 2024
1 parent 3fb4151 commit 62d9852
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
5 changes: 4 additions & 1 deletion src/server/controllers/item/create-item-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ export const createItemController = (req: IGetUserAuthInfoRequest, res: Response
logger.info(`Starting KPI file streaming and saving to db, item_id: ${itemId}`)
const parsingStart = Date.now()

await processMonitoringCsv(monitoringFileName, itemId)
processMonitoringCsv(monitoringFileName, itemId)
const BUFFER_SIZE = 10000
const csvStream = fs.createReadStream(kpiFilename)
.pipe(csv.parse({ headers: true }))
Expand Down Expand Up @@ -167,12 +167,15 @@ export const createItemController = (req: IGetUserAuthInfoRequest, res: Response

} catch(onEndError) {
await handleError(itemId, kpiFilename, onEndError)
} finally {
csvStream.removeAllListeners()
}
})
.on("error", async (processingError) => {
logger.info(`File processing was aborted because of an error, item_id: ${itemId}`)
tempBuffer = null
await handleError(itemId, kpiFilename, processingError)
csvStream.removeAllListeners()
})
} catch(e) {
logger.error(e)
Expand Down
4 changes: 4 additions & 0 deletions src/server/controllers/item/utils/process-monitoring-csv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,19 @@ export const processMonitoringCsv = (filename: string, itemId: string) => {
.on("end", async (rowCount: number) => {
try {
await db.none(pg.helpers.insert(tempBuffer, columnSet))
tempBuffer = null

fs.unlinkSync(filename)

logger.info(`Parsed ${rowCount} monitoring records`)
} catch(error) {
logger.error(`Error while processing monitoring data, itemId - ${itemId}: ${error}`)
} finally {
csvStream.removeAllListeners()
}
})
.on("error", (error) => {
logger.error(`Not valid monitoring csv file provided: ${error}`)
csvStream.removeAllListeners()
})
}

0 comments on commit 62d9852

Please sign in to comment.