PostgreSqlJson.java
package com.github.isuhorukov.log.watcher;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.slf4j.spi.LoggingEventBuilder;
import picocli.CommandLine;
import java.io.*;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
/**
* The {@code PostgreSqlJson} class is a command-line tool for reading
* PostgreSQL DBMS logs in JSON format and sending them to an OpenTelemetry collector.
* It can optionally use a {@link LogEnricher} to add additional data to the logs.
*
* @plantUml
* database "PostgreSQL 15+"
* node postgres_log_parser #palegreen
* node "OpenTelemetry collector"
* postgres_log_parser - "PostgreSQL 15+" : watch changes and parse JSON logs
* postgres_log_parser -(0- "OpenTelemetry collector": sending the logs
*/
@CommandLine.Command(mixinStandardHelpOptions = true, versionProvider = VersionProvider.class,
name = "postgres_log_parser",
header = "This program reads PostgreSQL DBMS logs in JSON format and sends them to OpenTelemetry collector")
@Setter
public class PostgreSqlJson implements Callable<Integer>, Closeable {
private static final String DURATION = "duration: ";
private static final String MS = " ms";
private static final String PLAN = "plan:\n";
private static final String QUERY_ID = "query_id";
private static final String JSON_SUFFIX = ".json";
private static final Logger logger = LoggerFactory.getLogger(PostgreSqlJson.class);
private static final Logger cliLogger = LoggerFactory.getLogger("cli");
private static final ObjectMapper mapper = new ObjectMapper();
/**
* Store current log position for each log file.
*/
final Map<String, Long> position = new ConcurrentHashMap<>();
/**
* LogEnricher instance to use in log processing.
*/
LogEnricher logEnricher = new EnrichmentOff();
/**
* Path to PostgreSQL log directory in JSON format.
*/
@CommandLine.Parameters(index = "0", description = "Path to PostgreSQL log directory in JSON format")
String watchDir;
/**
* Interval of saving (in second) of the current read position in the log files.
* The value must be within a range from 1 till 1000 second.
*/
@CommandLine.Option(names = {"-i", "--save_interval"}, defaultValue = "10",
description = "Interval of saving (in second) of the current read position in the log files. " +
"The value must be within a range from 1 till 1000 second")
long saveInterval;
/**
* The host name of the PostgreSQL server.
*/
@CommandLine.Option(names = {"-H", "--host"}, description = "The host name of the PostgreSQL server")
String posgreSqlHost;
/**
* The port number the PostgreSQL server is listening on.
*/
@CommandLine.Option(names = {"-p", "--port"}, defaultValue = "5432", description = "The port number the PostgreSQL server is listening on")
int posgreSqlPort;
/**
* The database name.
*/
@CommandLine.Option(names = {"-d", "--database"}, defaultValue = "postgres", description = "The database name")
String posgreSqlDatabase;
/**
* The database user on whose behalf the connection is being made.
*/
@CommandLine.Option(names = {"-u", "--user"}, defaultValue = "postgres",
description = "The database user on whose behalf the connection is being made")
String posgreSqlUserName;
/**
* Password for PG connection.
*/
@CommandLine.Option(names = "--password", arity = "0..1", interactive = true)
String posgreSqlPassword = System.getenv("PGPASSWORD");
/**
* Database query cache size.
*/
@CommandLine.Option(names = {"-c", "--max_cache_size"}, defaultValue = "50000",
description = "Database query cache size")
int maximumQueryCacheSize;
/**
* Path to file to save current processed position in log files. Required write capability for this program.
*/
@CommandLine.Option(names = {"-lp", "--log_pos_file"}, defaultValue = ".current_log_position",
description = "Path to file to save current processed position in log files. " +
"Required write capability for this program")
String currentLogPositionFile;
@Getter
private WatchService fsWatchService;
/**
* Main method to execute the application.
* Initializes the PostgreSqlJson class, executes the command line arguments, and exits the system if required.
*
* @param args Command line arguments passed to the application.
* @plantUml
* start
* :Create PostgreSqlJson instance;
* :Execute command line arguments;
* if (skipProcessExit is false) then (yes)
* :System.exit with exitCode;
* endif
* stop
*/
@SneakyThrows
public static void main(String[] args) {
try (PostgreSqlJson postgreSqlJson = new PostgreSqlJson()){
int exitCode = new CommandLine(postgreSqlJson).execute(args);
if(!Boolean.getBoolean("skipProcessExit")){
System.exit(exitCode);
}
}
}
/**
* Calls the method responsible for watching PostgreSQL logs.
*
* @return system exit code
* @throws Exception if an error occurs while watching the logs
*/
@Override
public Integer call() throws Exception {
return watchPostgreSqlLogs();
}
/**
* Monitors the PostgreSQL log directory for changes and processes the logs.
* <p>
* This method sets up a WatchService to continuously monitor the specified
* directory for new or modified log files in JSON format. When such a file
* is detected, it is processed to extract relevant log information.
* </p>
*
* <p>
* The method also makes use of a log enricher (if configured) to enhance the log data
* with additional information.
* </p>
*
* @return system exit code
*
* @throws IOException if an I/O error occurs initializing the watcher or processing the logs.
* @throws InterruptedException if the watch service is interrupted while waiting for events.
* @plantUml
* start
* if (watchDir is null or empty) then (yes)
* :log error;
* stop
* endif
* :create File from watchDir;
* if (directory not exists) then (yes)
* :log error;
* stop
* endif
* if (not a directory) then (yes)
* :log error;
* stop
* endif
* if (saveInterval invalid) then (yes)
* :log error;
* stop
* endif
* :initialize log enricher;
* :setup position file tasks;
* :perform initial log import;
* :create watch service;
* while (watchService active)
* :wait for watch events;
* if (file ends with .json) then (yes)
* :read JSON log;
* endif
* :reset watch key;
* endwhile
* stop
*/
public int watchPostgreSqlLogs() throws IOException, InterruptedException {
if(watchDir==null || watchDir.trim().isEmpty()){
cliLogger.error("Path to PostgreSQL log directory expected");
return 1;
}
File sourceDirectory = new File(watchDir);
if(!sourceDirectory.exists()) {
cliLogger.error("PostgreSQL directory '{}' with JSON logs not exist", watchDir);
return 1;
}
if(!sourceDirectory.isDirectory()){
cliLogger.error("Path '{}' is not directory", watchDir);
return 1;
}
if(saveInterval<=0 || saveInterval>1000){
cliLogger.error("saveInterval must be between 1 and 1000 sec. Actual value {}", saveInterval);
return 1;
}
initLogEnricher();
positionFileTasks();
initialLogImport(sourceDirectory);
Path dirToWatch = Paths.get(watchDir);
fsWatchService = getWatchService();
try (WatchService watchService = fsWatchService) {
registerWatchEvent(dirToWatch, watchService);
WatchKey key;
while ((key = watchService.take()) != null) {
for (WatchEvent<?> event : key.pollEvents()) {
String fileName = event.context().toString();
if(!fileName.endsWith(JSON_SUFFIX)){
continue;
}
readJsonLog(new File(watchDir, fileName));
Thread.yield();
}
key.reset();
}
}
return 0;
}
/**
* Initializes the log enricher for PostgreSQL logs.
* <p>
* This method creates an instance of {@link LogEnricherPostgreSql} with the specified
* PostgreSQL host, port, database, username, password, and query cache size.
* </p>
* <p>
* If the PostgreSQL host is not set or is empty, the method does not initialize the
* log enricher, logs an error and instantiate EnrichmentOff instead of LogEnricherPostgreSql.
* </p>
*
* @plantUml
* start
* if (posgreSqlHost is null or empty) then (yes)
* :initialize enrichment off;
* else (no)
* :create LogEnricherPostgreSql;
* if (failed to create LogEnricherPostgreSql) then (yes)
* :log error;
* :initialize enrichment off;
* else (no)
* :check extension;
* if (extension is not available) then (yes)
* :log error;
* :initialize enrichment off;
* endif
* endif
* endif
* stop
*/
void initLogEnricher() {
if(posgreSqlHost !=null && !posgreSqlHost.isEmpty()){
try {
logEnricher = new LogEnricherPostgreSql(posgreSqlHost, posgreSqlPort, posgreSqlDatabase, posgreSqlUserName, posgreSqlPassword, maximumQueryCacheSize);
} catch (Exception e) {
cliLogger.error("Failed to use log enricher {} for postgres, so I work in mode without log enrichment",
LogEnricherPostgreSql.class.getSimpleName(), e);
logEnricher = new EnrichmentOff();
}
try {
logEnricher.getStatement("0");
cliLogger.info("{} up and running", LogEnricherPostgreSql.class.getSimpleName());
} catch (Exception e) {
cliLogger.error("Make sure the extension is available in the database: CREATE EXTENSION pg_stat_statements;\n" +
"https://www.postgresql.org/docs/current/pgstatstatements.html", e);
logEnricher = new EnrichmentOff();
}
} else {
logEnricher = new EnrichmentOff();
}
}
/**
* Registers the specified dir with the given watch service to listen for file creation and modification events.
*
* @param dirToWatch the directory path to be watched
* @param watchService the watch service to which the directory should be registered
* @throws IOException if an I/O error occurs
*/
protected void registerWatchEvent(Path dirToWatch, WatchService watchService) throws IOException {
dirToWatch.register(watchService, ENTRY_CREATE, ENTRY_MODIFY);
}
/**
* Returns a new {@link WatchService} instance that can be used to watch
* objects for changes in a file tree.
*
* @return a new {@link WatchService} instance
* @throws IOException if an I/O error occurs
*/
protected WatchService getWatchService() throws IOException {
return FileSystems.getDefault().newWatchService();
}
/**
* Performs the initial import of PostgreSQL JSON log files from the specified directory.
* <p>
* This method reads all JSON log files in the given directory, sorts them by name,
* and processes each file by calling the {@link #readJsonLog(File)} method.
* </p>
*
* @param sourceDirectory the directory containing the PostgreSQL JSON log files to be imported.
* @throws IOException if an I/O error occurs while reading the log files.
* @plantUml
* start
* :Get all JSON files from source directory;
* if (Are there JSON files?) then (yes)
* :Sort files by name;
* while (More files to process?)
* :Read next JSON log file;
* endwhile
* endif
* stop
*/
protected void initialLogImport(File sourceDirectory) throws IOException {
File[] jsonLogs = sourceDirectory.listFiles(pathname -> pathname.getName().endsWith(JSON_SUFFIX));
if(jsonLogs!=null && jsonLogs.length>0){
Arrays.sort(jsonLogs, Comparator.comparing(File::getName));
for(File jsonLog: jsonLogs){
readJsonLog(jsonLog);
}
}
}
/**
* Parses a single line of PostgreSQL JSON log and logs it according to its severity level.
* <p>
* This method reads a log line in JSON format, determines the severity level of the log message,
* and logs the message using the appropriate logging level. If the corresponding logging level
* (TRACE, DEBUG, INFO, WARN, ERROR) is not enabled, the method will return immediately without
* further processing the log line.
* </p>
* <p>
* The method uses the {@code error_severity} field from the JSON log to determine the logging level
* and creates a logging event with additional key-value pairs extracted from the JSON log.
* It also handles various log message formats, adjusting or adding specific log attributes such as
* duration, plan, parse, and bind if they are present in the message.
* </p>
* @param line the log line in JSON format to be parsed.
* @param logName the name of the log file from which the line was read.
* @plantUml
* start
* :read line as JSON;
* :get severity level;
* if (severity is not enabled) then (yes)
* stop
* endif
* :get fields from JSON;
* :get message;
* if (message contains enricherApplicationName) then (yes)
* stop
* endif
* :create logging event;
* if (message starts with DURATION) then (yes)
* :parse duration;
* else (no)
* :set message in logging event;
* if (message starts with "statement: ") then (yes)
* :add statement key-value;
* elseif (message starts with "execute") then (yes)
* :add execute key-value;
* endif
* endif
* :process log record attributes;
* if (logging event builder is null) then (yes)
* stop
* endif
* :add fileName to logging event;
* :log event;
* stop
*/
@SneakyThrows
void parseLogLine(String line, String logName) {
JsonNode jsonNode = mapper.readTree(line);
Level severity = getSeverity(jsonNode.at("/error_severity").asText());
if (skipProcessing(severity)) return;
Iterator<Map.Entry<String, JsonNode>> fields = jsonNode.fields();
String message = jsonNode.at("/message").asText();
if(logEnricher.enricherApplicationName()!=null && message.contains(logEnricher.enricherApplicationName())){
return;
}
LoggingEventBuilder loggingEventBuilder = logger.atLevel(severity);
if(message.startsWith(DURATION)){
loggingEventBuilder = parseDuration(message, loggingEventBuilder);
} else {
loggingEventBuilder = loggingEventBuilder.setMessage(message);
if(message.startsWith("statement: ")){
loggingEventBuilder = loggingEventBuilder.addKeyValue("statement", true);
} else
if(message.startsWith("execute")){
loggingEventBuilder = loggingEventBuilder.addKeyValue("execute", true);
}
}
loggingEventBuilder = processLogRecordAttributes(fields, loggingEventBuilder);
if (loggingEventBuilder == null) return; //skip enricher log record by clientName
loggingEventBuilder = loggingEventBuilder.addKeyValue("fileName", logName);
loggingEventBuilder.log();
}
private boolean skipProcessing(Level severity) {
switch (severity){
case TRACE:
if(!logger.isTraceEnabled()) return true;
break;
case DEBUG:
if(!logger.isDebugEnabled()) return true;
break;
case INFO:
if(!logger.isInfoEnabled()) return true;
break;
case WARN:
if(!logger.isWarnEnabled()) return true;
break;
case ERROR:
if(!logger.isErrorEnabled()) return true;
break;
}
return false;
}
private LoggingEventBuilder processLogRecordAttributes(Iterator<Map.Entry<String, JsonNode>> fields,
LoggingEventBuilder loggingEventBuilder) {
while (fields.hasNext()) {
Map.Entry<String, JsonNode> entry = fields.next();
String key = entry.getKey();
String value = entry.getValue().asText();
if("message".equals(key) ||
(QUERY_ID.equals(key) && "0".equals(value)) //skip empty query_id
){
continue;
}
if("application_name".equals(key) && value.equals(logEnricher.enricherApplicationName())){
return null;
}
if(QUERY_ID.equals(key)){
loggingEventBuilder = enrichWithStatementText(value, loggingEventBuilder);
}
JsonNode entryValue = entry.getValue();
loggingEventBuilder = loggingEventBuilder.addKeyValue(key, getValue(entryValue));
}
return loggingEventBuilder;
}
private LoggingEventBuilder enrichWithStatementText(String queryId, LoggingEventBuilder loggingEventBuilder) {
String statement = logEnricher.getStatement(queryId);
if(statement!=null && !statement.isEmpty()){
loggingEventBuilder = loggingEventBuilder.addKeyValue("statement_text", statement);
}
return loggingEventBuilder;
}
private LoggingEventBuilder parseDuration(String message, LoggingEventBuilder loggingEventBuilder) {
int msEndIndex = message.indexOf(MS);
double duration = Double.parseDouble(message.substring(DURATION.length(), msEndIndex));
loggingEventBuilder = loggingEventBuilder.addKeyValue("duration", duration);
int planIdx = message.indexOf(PLAN);
if(planIdx!=-1){
String plan = message.substring(planIdx + PLAN.length());
loggingEventBuilder = loggingEventBuilder.addKeyValue("plan", plan);
} else
if(message.indexOf(" parse ",msEndIndex+2)>-1){
loggingEventBuilder = loggingEventBuilder.addKeyValue("parse", true);
} else
if(message.indexOf(" bind ",msEndIndex+2)>-1){
loggingEventBuilder = loggingEventBuilder.addKeyValue("bind", true);
}
loggingEventBuilder = loggingEventBuilder.setMessage("");
return loggingEventBuilder;
}
private static Object getValue(JsonNode entryValue) {
switch (entryValue.getNodeType()){
case NUMBER:
switch (entryValue.numberType()){
case INT:
return entryValue.asInt();
case LONG:
return entryValue.asLong();
default:
return entryValue.asText();
}
case STRING:
default:
return entryValue.asText();
}
}
private static Level getSeverity(String severity) {
//https://www.postgresql.org/docs/current/runtime-config-logging.html#RUNTIME-CONFIG-SEVERITY-LEVELS
switch (severity){
case "DEBUG":
case "DEBUG5":
case "DEBUG4":
case "DEBUG3":
case "DEBUG2":
case "DEBUG1": return Level.DEBUG;
case "LOG":
case "INFO":
case "NOTICE": return Level.INFO;
case "WARNING": return Level.WARN;
case "ERROR":
case "FATAL":
case "PANIC": return Level.ERROR;
default:
return Level.TRACE;
}
}
/**
* Manages the tasks related to handling positional information for PostgreSQL log files.
* <p>
* This method processes and updates the positional information of log files to ensure that
* logs are correctly read and parsed from the last known position. It handles the initialization,
* periodic updates, and finalization of positions for different log files. The method ensures
* that log parsing can resume correctly after restarts or interruptions by accurately maintaining
* and updating the position information.
* </p>
* <p>
* The method typically involves the following tasks:
* </p>
* <ul>
* <li>Initializing the position map for new log files.</li>
* <li>Updating the position as logs are read and processed.</li>
* <li>Saving the current position to persistent storage.</li>
* </ul>
* <p>
* This method is crucial for log processing scenarios where it's important to not miss any log
* entries and to avoid reprocessing already handled logs.
* </p>
*
* @throws IOException if an I/O error occurs while managing the log file positions.
* @return return shutdown hook thread
* @plantUml
* start
* if (currentPositionFile exists and not empty?) then (yes)
* :Read position from file and put into map;
* endif
* :Schedule periodic position saving task;
* :Create shutdown hook thread;
* :Add shutdown hook to Runtime;
* :Return shutdown hook thread;
* stop
*/
protected Thread positionFileTasks() throws IOException {
File currentPositionFile = new File(currentLogPositionFile);
if(currentPositionFile.exists() && currentPositionFile.length()>0) {
position.putAll(mapper.readValue(currentPositionFile,
new TypeReference<ConcurrentHashMap<String, Long>>() {}));
}
new Timer("LogPositionSaver", true).schedule(new TimerTask() {
@Override
public void run() {
saveLogFilesPosition();
}
}, TimeUnit.SECONDS.toMillis(saveInterval), TimeUnit.SECONDS.toMillis(saveInterval));
Thread savePostitionShutdownHook = new Thread(this::saveLogFilesPosition);
Runtime.getRuntime().addShutdownHook(savePostitionShutdownHook);
return savePostitionShutdownHook;
}
/**
* Saves the current read positions of PostgreSQL log files.
* <p>
* This method writes the current positions of log files to a specified file, ensuring accurate
* resume points after restarts or interruptions. It ensures atomicity and consistency to prevent
* data loss.
* </p>
*
* @plantUml
* start
* :Check if position is empty?;
* if (yes) then (true)
* stop
* endif
* :Create FileOutputStream for currentLogPositionFile;
* :Write new TreeMap(position) to currentPostitionFile using mapper;
* if (Exception) then (catch)
* :Log error with cliLogger;
* endif
* stop
*/
protected synchronized void saveLogFilesPosition() {
try {
if(position.isEmpty()){
return;
}
try (FileOutputStream currentPostitionFile = new FileOutputStream(currentLogPositionFile)){
mapper.writeValue(currentPostitionFile,new TreeMap<>(position));
}
} catch (Exception e) {
cliLogger.error("Unable to save current log position", e);
}
}
/**
* Reads a JSON log file and processes each line.
* This method reads through the specified JSON log file starting from the last saved position,
* parses each line, and processes it according to the log's severity level. It updates the
* read position after processing each log entry to ensure accurate resumption.
*
* @param jsonLog the JSON log file to read
* @throws IOException if an error occurs while reading the log file
* @plantUml
* start
* :Get jsonLogName from jsonLog;
* :Compute from position using jsonLogName;
* if (jsonLog.length() == 0) then (yes)
* stop
* endif
* if (jsonLog.length() <= from) then (yes)
* stop
* endif
* :Open RandomAccessFile randomAccessJson in read mode;
* :Seek randomAccessJson to position from;
* while (line != null)
* :Read line from randomAccessJson;
* :parseLogLine(line, jsonLogName);
* endwhile
* :Update position with jsonLog.length();
* :Close randomAccessJson;
* stop
*/
void readJsonLog(File jsonLog) throws IOException {
String jsonLogName = jsonLog.getName();
long from = position.computeIfAbsent(jsonLogName, name -> 0L);
if(jsonLog.length() ==0){
return;
}
if(jsonLog.length()==0 || jsonLog.length()<=position.computeIfAbsent(jsonLogName, name -> 0L)){
return;
}
try (RandomAccessFile randomAccessJson = new RandomAccessFile(jsonLog, "r")) {
randomAccessJson.seek(from);
String line;
while ((line = randomAccessJson.readLine())!= null) {
parseLogLine(line, jsonLogName);
}
}
position.put(jsonLogName, jsonLog.length());
}
@Override
public void close() throws IOException {
logEnricher.close();
}
}