Collector Preprocessing

List of Contents

Collector gathers log data, analyzes data as specified, and then sends the data to databases. With only these features, it lacks flexibility of handling collected data. Thus, to provide flexibility, Machbase supports preprocessing frameworks to process collected data by using Python.

Collector Preprocessing Framework

Environment for Preprocessing

In order to keep compatible with preprocessing framework, Python is strongly recommended to use. Python is included in webadmin of the package. The 2.6 version of Python should be used and can be found in $MACH_COLLECTOR_HOME/webadmin/flask/Python/bin. It is recommended to execute setup scripts via Python in the directory in order to prevent issues due to different versions of Python when you are using additional libraries. Python included in the package is not default version so that you need to add the library path of the added modules. The path will be registered in the environment variable and it will be predefined in USER_PREPROCESS_LIB_PATH as the existing path of Python cannot be used again. It will be defined in USER_PREPROCESS_LIB_PATH in the format of : just like other methods. When additional paths are required to add to the existing path, use " : " symbol to separate the paths.

Order of Preprocessing

It describes the order of preprocessing during the process of log conversion. To explain how it works, the same concept is borrowed from the User-defined Log Collection along with the diagram.

Message Preprocessing

When data are inserted in the log source file at the bottom of the diagram, each log data is divided into log unit. Name this data as the origin_msg temporarily. Each origin_msg will go through the process as shown above one at a time. The diagram above shows that the first message that is inserted is "Aug 19 15:37:12 localhost NetworkManager[1340]: (eth1): bringing up device." The origin_msg is divided into token unit by parsing regular expressions. The process is called the message parsing. It is possible to perform the first preprocessing over origin_msg prior to the message parsing. When origin_msg is modified, caution must be exercised as the changed origin_msg will be parsed.

If origin_msg was modified with other rules other than the regular expression which was specified in rgx file, it may parse data in abnormal way.

Column Preprocessing

When the message is parsed, tokens with or without meanings in rgx file are stored in database. At this point, the second preprocessing can be performed prior to tokens being transferred to the database. Each token is stored in rgx file with a specified field name and it can be modified or used as well. When the origin_msg was modified along with the token, the modified values of tokens are stored in the database.

For tokens, when different types of data are inserted rather than specified types in rgx file, an error might occur.

Refer to the Sample Script below for detailed information.

Preprocessing Script

Preprocessing must be written with Python scripts. To prevent errors like typos, it is recommended to use "custom.py" with some changes to fit into your environments. "custom.py" is provided as default. See below for details.

PRS_SUCCESS       = ( 0, None )
PRS_SUCCESS_INFO  = ( 1, "Info Msg")
PRS_SUCCESS_SKIP  = ( 2, None )
PRS_FAILURE       = (-1, "Error Msg" )

class mach_preprocess:
    def __init__(self):
        return
    def mach_msg_preprocess(self, dict):
        return PRS_SUCCESS;
    def mach_column_preprocess(self, dict):
        return PRS_SUCCESS;
    def __del__(self):
        return

Definition of Return Value

To make different operations of the collector based on preprocessing scripts, use return values of functions. The return values which were called by collectors are tuples with (code, message) format.

PRS_SUCCESS       = ( 0, None )
PRS_SUCCESS_INFO  = ( 1, "Info Msg")
PRS_SUCCESS_SKIP  = ( 2, None )
PRS_FAILURE       = (-1, "Error Msg" )

As you can see from above, PRS_SUCCESS, PRS_SUCCESS_INFO, PRS_SUCCESS_SKIP, and PRS_FAILURE are four defined return values. The first code values of the tuple are applied on the collectors.

If PRS_SUCCESS is 0, no effects on the collector. If PRS_SUCCESS_INFO is 1, it works properly and records returned Info Msg in a trace file. If PRS_SUCCESS_SKIP is 2, it ignores the logic of collectors, and move onto the next log data for processing. If PRS_FAILURE is -1, it records returned error message in a trace file during the preprocessing and skip to the next message.

If you can control the flow with PRS_SUCCESS and PRS_SUCCESS_SKIP, it leaves log history through PRS_SUCCESS_INFO and PRS_FAILURE.

Class Definition

Collectors are executed by calling predefined classes and functions in Python scripts. The example below shows the role of each function, starting point of a function, and "dict" variables that are returned by functions. If the names of classes and functions are changed, it fails to execute. Caution must be exercised when modifying the files.

class mach_preprocess:
    def __init__(self):
        return
    def mach_msg_preprocess(self, dict):
        return PRS_SUCCESS;
    def mach_column_preprocess(self, dict):
        return PRS_SUCCESS;
    def __del__(self):
        return

The name of predefined class is "machpreprocess." It can also exchange variables between functions through "self" class instance as it is defined as a class. init is the Python constructor and del is the Python destructor. init is executed when a collector process is declared. del is executed only once when a collector process is terminated. Declare variables in init to use globally, and when the collector is terminated, it frees the allocated memory in _del. For the two functions, return values are set to none.

Predefined functions are "mach_msg_preprocess" and "mach_column_preprocess". See below for details.

mach_msg_preprocess

It is executed when input messages are divided into a sentence. As it was called before message parsing, the collector related meta data and the original message "origin_msg" are inserted. For collector related meta data, table name, collector types, name of currently operating collector and location of data collection are provided. These information are not applied to the collector as it is environment information for writing preprocessing scripts. The origin_msg is entered as a unit log for analysis. For msg data, when data are modified, the changes are applied into the collector as well. Caution must be exercised when changes are made to msg without following regular expressions specified in the rgx file, it may fail to parse the data.

Table 1. Dictionary

Key Value Changes applied or not
table_name Name of table X
collect_type Types of data collection X
collector_name Name of currently operating collector X
data_source Path of data collection X
origin_msg Log of raw data O

The user can experience fast processing speed by returning SUCCESS_SKIP as it is called before parsing messages which take much of the time when the collector is processing the data. If "SKIP" can be distinguished from messages, it would be better to process it in the mach_msg_preprocess.

mach_column_preprocess

When input messages are parsed, it is divided into tokens. At this point, mach_column_preprocess function is called to process. Tokens which are set in rgx file to map with table columns can be used. Like the "mach_msg_preprocess", meta data for scripting are provided but it won't be applied to the collector as the message parsing has been done already.

Table 2. Dictionary

Key Value Changes applied or not
table_name Name of table X
collect_type Types of data collection X
collect_name Name of currently operationg collector X
data_source Path of data collection X
origin_msg Log of raw data X
column_name nth of column tokens O

Sample Script

The samples are provided by default. Basically, sample scripts are operated based on syslog. The sample templates can be found in "$MACH_COLLECTOR_HOME/collector" folder. Now, let's check how to set preprocessing scripts by using syslog.tpl (sample template).

###############################################################################
Copyright of this product 2013-2023,
Machbase Inc. or its subsidiaries.
All Rights reserved
###############################################################################
#
This file is for Machbase collector template file.
#
###################################################################
Collect setting
###################################################################

COLLECT_TYPE=FILE
LOG_SOURCE=/var/log/syslog

###################################################################
Process setting
###################################################################

REGEX_PATH=syslog.rgx
PREPROCESS_PATH=script_path

###################################################################
Output setting
###################################################################
DB_TABLE_NAME = "syslogtable"
DB_ADDR = "127.0.0.1"
DB_PORT = 5656
DB_USER = "SYS"
DB_PASS = "MANAGER"
#
0: Direct insert
1: Prepared insert
2: Append
APPEND_MODE=2
#
0: None, just append.
1: Truncate.
2: Try to create table. If table already exists, warn it and proceed.
3: Drop and create.
CREATE_TABLE_MODE=2

The setting value specified by preprocessing scripts in template files is PREPROCESS_PATH. When you enter the absolute path of scripts, the setting will be set.

If preprocessing scripts are located in the default path, $MACH_COLLECTOR_HOME/collector/preprocess directory, entering the name of file can trigger the operation.

SKIP

It skips the data entry when you want to exclude a certain word after checking the message. In the template file, it would be executed after adding PREPROCESS_PATH=skip.py setting as it is provided for the default path, $MACH_COLLECTOR_HOME/collector/preprocess.

PRS_SUCCESS       = ( 0, None )
PRS_SUCCESS_INFO  = ( 1, "Info Msg" )
PRS_SUCCESS_SKIP  = ( 2, None )
PRS_FAILURE       = (-1, "Error Msg" )

class mach_preprocess:
    def __init__(self):
        return
    def mach_msg_preprocess(self, dict):
        if dict['origin_msg'].find("CMD") is not -1: <== Search "CMD"
            return PRS_SUCCESS_SKIP <== Skip if "CMD" is included
        else:
            return PRS_SUCCESS;
    def mach_column_preprocess(self, dict):
        return PRS_SUCCESS;
    def __del__(self):
        return

#Test code
if __name__ == "__main__":
    pre_obj = mach_preprocess()
    dict = {"origin_msg":"Jul 16 07:09:01 mach-Precision-T1700 CRON[1220]: (root) CMD (  [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && 
[ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"}
    print pre_obj.mach_msg_preprocess(dict)

    dict = {"origin_msg":"Jul 16 07:39:31 mach-Precision-T1700 cracklib: no dictionary update necessary."}
    print pre_obj.mach_msg_preprocess(dict)

With "machmsgpreprocess" function, it receives the contents of origindict that is not parsed yet via dict variables and check whether "origindict" has "CMD". If "CMD" is found, skip it. The contents below from "if __name == "__main":" describes the test to check whether the script is properly operating. Please refer to "Script Test" for detailed information.

REPLACE

When "CRON" strings exist in "msg" column after completing the message parsing, REPLACE function replaces it to "cron-execute" string. It is also provided for "$MACH_COLLECTOR_HOME/collector/preprocess" folder so that it is operated by simply adding the setting of PREPROCESS_PATH=replace.py to the template file.

PRS_SUCCESS       = ( 0, None )
PRS_SUCCESS_INFO  = ( 1, "Info Msg" )
PRS_SUCCESS_SKIP  = ( 2, None )
PRS_FAILURE       = (-1, "Error Msg" )

class mach_preprocess:
    def __init__(self):
        return
    def mach_msg_preprocess(self, dict):
        return PRS_SUCCESS;
    def mach_column_preprocess(self, dict):
        dict['msg'] = dict['msg'].replace("CRON", "cron-execute") <== Replace sentence
        return PRS_SUCCESS;
    def __del__(self):
        return

#Test code
if __name__ == "__main__":
    pre_obj = mach_preprocess()
    dict = {"tm":"Jul 16 07:39:01", "host":"mach-Precision-T1700", "msg":"CRON[1377]: (root) CMD (  [ -x /usr/lib/php5/maxlifetime ] && 
[ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"}
    (code, msg) = pre_obj.mach_column_preprocess(dict);
    if code >= 0:
        print dict
    else:
        print msg

When the original message is divided into tokens such as tm, host and msg after parsing "machcolumnpreprocess", it replaces "CRON" with "cron-execute" in dict['msg']. The contents below from "if name == "__main":" describes the test to check whether the script is properly operating. Please refer to Script Test for detailed information.

TRACE

TRACE script is recording contents of dict object which are entered by "mach_msg_preprocess" and "mach_column_preprocess" functions. When you add PREPROCESS_PATH=trace.py setting to a template file, it works properly as the script is provided for the default path "$MACH_COLLECTOR_HOME/collector/preprocess".

PRS_SUCCESS       = ( 0, None )
PRS_SUCCESS_INFO  = ( 1, "Info Msg" )
PRS_SUCCESS_SKIP  = ( 2, None )
PRS_FAILURE       = (-1, "Error Msg" )

class mach_preprocess:
    def __init__(self):
        self.msg_file = open("/tmp/msg.log", 'a') <== Define file variable
        self.column_file = open("/tmp/column.log", 'a')
        return
    def mach_msg_preprocess(self, dict):
        self.msg_file.write(str(dict)+"\n"); <== Write input argument in a file
        self.msg_file.write("\n");
        return PRS_SUCCESS;
    def mach_column_preprocess(self, dict):
        self.column_file.write(str(dict)+"\n");
        self.column_file.write("\n");
        return PRS_SUCCESS;
    def __del__(self):
        self.msg_file.close() <== Free file variable
        self.column_file.close()
        return

#Test code
if __name__ == "__main__":
    pre_obj = mach_preprocess()
    dict = {"origin_msg":"Jul 16 06:39:01 mach-Precision-T1700 CRON[1149]: (root) CMD (  [ -x /usr/lib/php5/maxlifetime ] && ]
[ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"}
    pre_obj.mach_msg_preprocess(dict)
    dict = {"origin_msg":"Jul 16 06:39:01 mach-Precision-T1700 CRON[1149]: (root) CMD (  [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && 
[ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))", "tm":"Jul 16 06:39:01", "host":"mach-Precision-T1700", 
"msg":"CRON[1149]: (root) CMD (  [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && 
/usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"}
    pre_obj.mach_column_preprocess(dict)

Since init and del are used for the first time, it is meaningful. init creates instance variables "msgfile" and "columnfile" respectively by using the instance variable "self". msgfile opens "/tmp/msg.log" and columnfile opens "/tmp/column.log". The variables can be accessed by other functions as well within the script using "self" variable. flux_msg_preprocess function inputs meta data and origin_msg, and records the results in /tmp/msg.log file. mach_column_preprocess function inputs meta and column tokens, and records the results in /tmp/column.log file. The contents below from "if __name == "__main":" describes the test to check whether the script is properly operating. Please refer to Script Test for detailed information.

ODBC

ODBC scripts store search keys and target tables in databases. If there is the search key in the input message, it inserts the data into the specified target table. It can be operated by simply adding PREPROCESS_PATH=pypyodbc_sample.py as it is provided in the "$MACH_COLLECTOR_HOME/collector/preprocess". code_type values are inserted as numbers in pypyodbc_sample.py file. The values are searched via ODBC scripts in databases and replace numbers with string characters, and then store the strings into databases.

In this sample, pypyodbc from ODBC library is used. pypyodbc is the only proven library among ODBC library so that it is recommend to use pypyodbc when you are using ODBC.

Currently, ODBC is not supported to access Machbase due to library collision.

import pypyodbc

PRS_SUCCESS       = ( 0, None )
PRS_SUCCESS_INFO  = ( 1, "Info Msg" )
PRS_SUCCESS_SKIP  = ( 2, None )
PRS_FAILURE       = (-1, "Error Msg" )

class mach_preprocess:
    def __init__(self):
        self.con = pypyodbc.connect("DSN=MYSQL") <== Predefined MySQL DSN.
        self.cursor = self.con.cursor()
        self.table_name = "error_msg"
        self.test_data_make(); <== Create random data.
        return
    def mach_msg_preprocess(self, dict):
        return PRS_SUCCESS;
    def mach_column_preprocess(self, dict):
        result = self.cursor.execute("select code, msg from %s where code = %d"%(self.table_name, int(dict['code_type']))) <== Search related data from a table.
        if result is not None: <== When search result exists.
            dict['code_type'] = result.fetchall()[0][1] <== Replace numbers with strings.
        else:
            print "failure "+str(dict)
        return PRS_SUCCESS;
    def __del__(self):
        self.cursor.close()
        self.con.close()
        return

    #for test
    def test_data_make(self):
        self.table_check("create table %s (code integer, msg varchar(255))");
        return
    def table_check(self, query):
        self.tables = self.cursor.tables().fetchall()
        self.table_list = []
        for (db, user, table, info, none) in self.tables:
            self.table_list.append(table.upper())
        if self.table_name.upper() in self.table_list: <== If a table exists, delete it and create a table again.
            self.cursor.execute("drop table %s"%self.table_name)
        self.cursor.execute(query%self.table_name);
        self.insert_error_msg()
        self.cursor.commit()
        return
    def insert_error_msg(self): <== Store numeric code and message in a table.
        error = ((0, "SUCCESS"), (1, "SUCCESS_WITH_INFO"), (-1, "FAILURE"))
        for (code, msg) in error:
            self.cursor.execute("insert into %s values ( %d, '%s')"%(self.table_name, code, msg))
        return

# Test code
if __name__ == "__main__":
    pre_obj = mach_preprocess()
    pre_obj.test_data_make()
    dict = {"tm":"Jul 16 07:39:01","host":"mach-Precision-T1700","msg":"CRON[1377]: (root) CMD (  [ -x /usr/lib/php5/maxlifetime ] && 
[ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))","code_type":"-1"}
    pre_obj.mach_column_preprocess(dict)
    print dict
    dict = {"tm":"Jul 15 11:31:54","host":"mach-Precision-T1700","msg":"NetworkManager[1340]: <error> [1405391514.205040] [nm-system.c:768] 
nm_system_iface_get_flags(): (unknown): failed to get interface link object","code_type":"0"}
    pre_obj.mach_column_preprocess(dict)
    print dict
    dict = {"tm":"Jul 15 11:31:54","host":"mach-Precision-T1700","msg":"NetworkManager[1340]: <warn> sysctl: failed to 
open '/proc/sys/net/ipv6/conf/eth1/use_tempaddr': (2) No file in directory","code_type":"1"}

Imported pypyodbc is not default module so that it is required to install it. Thus, download and install pypyodbc in $MACH_COLLECTOR_HOME/webadmin/flask/Python/bin/python. The installation path is "$MACH_COLLECTOR_HOME/webadmin/flask/Python/lib/python2.7/site-packages/pypyodbc-1.3.3-py2.7.egg". It is required to specify the path for imported modules. There are two methods to add the installation paths. One is to change the path of sys module. The other method is providing environment variables of the library path in the collector. Here, the latter method is explained as the first method is built-in feature. The name of environment variable that the collector uses is USER_PREPROCESS_LIB_PATH. " : " symbol is used to add following paths with the format of :, then the preprocessing scripts are executed with the following paths. To use pypyodbc, issue the command below.

export USER_PREPROCESS_LIB_PATH=$MACH_COLLECTOR_HOME/webadmin/flask/Python/lib/python2.7/site-packages/pypyodbc-1.3.3-py2.7.egg

The code above only adds one pypyodbc so that " : " was not used. When you issue the command above, it can import pypyodbc when executing "pypyodbc_sample.py" script. If there are too many errors, using the command above is better option compared to replace it within the scripts by searching it via ODBC. It can operate JOIN feature by using scripts as the database can be accessed via ODBC.

Preprocessing Script Test

When writing a new script, it checks whether the script is properly operating. There are two execution methods: direct and indirect. The two operations have near identical scripts, except the part that a preprocessing script is executed by itself or a text script calls a preprocessing script.

Direct Execution

The sample scripts have the preprocessing script along with other scripts. The scripts are for testing whether it is properly running by calling the preprocessing class. See below for test scripts which were used in "skip.py".

if __name__ == "__main__":
    pre_obj = mach_preprocess()
    dict = {"origin_msg":"Jul 16 07:09:01 mach-Precision-T1700 CRON[1220]: (root) CMD (  [ -x /usr/lib/php5/maxlifetime ] && 
[ -x /usr/lib/php5/sessionclean ] && [ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"}
    print pre_obj.mach_msg_preprocess(dict)

    dict = {"origin_msg":"Jul 16 07:39:31 mach-Precision-T1700 cracklib: no dictionary update necessary."}
    print pre_obj.mach_msg_preprocess(dict)

Test script starts with if name == "main":. For Python scripts, if the test script is executed directly by the interpreter, set name variable to "main". Therefore, if the collector tries to execute the script, it doesn't work, but when the script itself tries to operate, it works properly. machpreprocess() returns the instance of machpreprocess. The examples used here call a constructor __init and then the instance is stored in the "pre_obj". Both "mach_msg_preprocess" and "mach_column_preprocess" require "self" and "dict" variables. Self is a name of pre_obj method and is automatically inserted when a preprocess method is called. The test can be run when the user inserts "dict" in the same format of actual input data. If it is difficult to create actual input data, register "trace.py" to PREPROCESS_PATH and then execute the collector. Then, "/tmp/msg.log" and "/tmp/column.log" are created and you can use a dictionary data from the newly created data.

Indirect Execution

If you don't want to write the codes the same way of the direct execution or want to re-use already created data, you can use the indirect execution method. The same method of the direct execution is used for writing the test scripts, but it is required to read ready-made modules and then calls.

import skip <== Import ready-made scripts

if __name__ == "__main__":
    pre_obj = skip.mach_preprocess() <== When creating a class, call mach_preprocess in the script.
    dict = {"origin_msg":"Jul 16 07:09:01 mach-Precision-T1700 CRON[1220]: (root) CMD (  [ -x /usr/lib/php5/maxlifetime ] && [ -x /usr/lib/php5/sessionclean ] && 
[ -d /var/lib/php5 ] && /usr/lib/php5/sessionclean /var/lib/php5 $(/usr/lib/php5/maxlifetime))"}
    print pre_obj.mach_msg_preprocess(dict)

    dict = {"origin_msg":"Jul 16 07:39:31 mach-Precision-T1700 cracklib: no dictionary update necessary."}
    print pre_obj.mach_msg_preprocess(dict)

results matching ""

    No results matching ""