ELK收集Oracle审计日志
一、开启审计,记录sql
--Oracle审计
https://max.book118.com/html/2016/1214/72170219.shtm
--开启审计
alter system set audit_sys_operations=TRUE scope=spfile;
alter system set audit_trail=db,extended scope=spfile;
--重启实例
shutdown immediate
startup
--查看审计状态
SYS@orcl>show parameter audit;
NAME TYPE VALUE
------------------------------------ ----------- ------------------------------
audit_file_dest string /u01/app/oracle/admin/orcl/adump
audit_sys_operations boolean TRUE
audit_syslog_level string
audit_trail string DB, EXTENDED
二、迁移审计表空间
--迁移审计表到ogg的表空间
BEGIN
DBMS_AUDIT_MGMT.set_audit_trail_location(
audit_trail_type => DBMS_AUDIT_MGMT.AUDIT_TRAIL_AUD_STD,
audit_trail_location_value => 'OGG');
END;
/
BEGIN
DBMS_AUDIT_MGMT.set_audit_trail_location(
audit_trail_type => DBMS_AUDIT_MGMT.AUDIT_TRAIL_FGA_STD,
audit_trail_location_value => 'OGG');
END;
/
--验证表空间
SELECT table_name, tablespace_name FROM dba_tables WHERE table_name IN ('AUD$', 'FGA_LOG$');
select TABLE_NAME, SEGMENT_NAME, TABLESPACE_NAME from dba_lobs where table_name in ('AUD$', 'FGA_LOG$');
--审计段大小统计
SELECT owner
,table_name
,SUM(decode(seg_type, 'table', size_mb)) tab_size_mb
,SUM(decode(seg_type, 'index', size_mb)) idx_size_mb
,SUM(decode(seg_type, 'lob', size_mb)) lob_size_mb
,SUM(size_mb)
FROM (SELECT /*+ rule */ t.owner
,t.table_name
,SUM(s.bytes) / 1024 / 1024 size_mb
,'table' seg_type
FROM dba_segments s, dba_tables t
WHERE s.owner = t.owner
AND s.segment_name = t.table_name
GROUP BY t.owner, t.table_name
UNION ALL
SELECT /*+ rule */ l.owner
,l.table_name
,SUM(s.bytes) / 1024 / 1024 size_mb
,'lob' seg_type
FROM dba_segments s, dba_lobs l
WHERE s.owner = l.owner
AND s.segment_name = l.segment_name
GROUP BY l.owner, l.table_name
UNION ALL
SELECT /*+ rule */ i.table_owner owner
,i.table_name
,SUM(s.bytes) / 1024 / 1024 size_mb
,'index' seg_type
FROM dba_segments s, dba_indexes i
WHERE s.owner = i.owner
AND s.segment_name = i.index_name
GROUP BY i.table_owner, i.table_name)
WHERE owner = 'SYS'
AND table_name = 'AUD$'
group by owner, table_name;
Oracle审计相关内容参考:
https://blog.csdn.net/demonson/article/details/116146370
三、根据需求设计审计规则
需求:
要求ELK收集现有数据库所有sql语句,但由于DML语句过于频繁,只收集数据库所有DDL语句(不管语句是否执行成功,都收集上);
根据以上需求和上面oracle审计技术文档,判断为语句级审计满足需求: 需要审计项目:
" class="reference-link">
先清空oracle默认开启的审计项
select * from sys.audit$;
--sys用户登录
truncate table audit$;
在执行下面语句级审计DDL
审计语句:
AUDIT DATABASE LINK;
AUDIT INDEX;
AUDIT PROCEDURE;
AUDIT SEQUENCE;
AUDIT SYSTEM GRANT;
AUDIT TABLE;
AUDIT TABLESPACE;
AUDIT TRIGGER;
AUDIT USER;
AUDIT TYPE;
AUDIT VIEW;
AUDIT ALTER SEQUENCE;
AUDIT ALTER TABLE;
重启数据库
shut immediate
startup
涉及到的SQL:
select * from sys.audit$;
select * from sys.aud$;
select * from dba_priv_audit_opts;
select * from dba_stmt_audit_opts;
select * from sys.AUDIT$ a inner join sys.STMT_AUDIT_OPTION_MAP b on a.option#=b.OPTION#;
select userid,userhost, to_char(substr(SQLTEXT,1,3000)) as sql,action#,returncode,to_char(NTIMESTAMP#,'yyyy-mm-dd hh24:mi:ss'),
SQLBIND,SQLTEXT from sys.aud$ t where userid='GGS'order by t.ntimestamp# desc;
取消审计语句:
NOAUDIT DATABASE LINK;
NOAUDIT INDEX;
NOAUDIT PROCEDURE;
NOAUDIT SEQUENCE;
NOAUDIT SYSTEM GRANT;
NOAUDIT TABLE;
NOAUDIT TABLESPACE;
NOAUDIT TRIGGER;
NOAUDIT USER;
NOAUDIT TYPE;
NOAUDIT VIEW;
NOAUDIT ALTER SEQUENCE;
NOAUDIT ALTER TABLE;
取消所有审计语句:
NOAUDIT ALL;
#取消审计等操作若是不生效,测试重新配置审计打开并重启数据库能有效解决
四、ELK抓取审计语句
1.sys用户下创建视图
create or replace view audit_sql_vw as
select to_char(t.NTIMESTAMP# + 1 / 3, 'yyyy-mm-dd hh24:mi:ss') auditime, --审计时间
t.userid as username, --用户名称
t.userhost as hostname, --客户端主机名
b.name as sqltype, --语句类别
t.obj$name as objname, --对象名称
t.returncode, --返回值,0表示执行成功
to_char(substr(t.SQLTEXT, 1, 3000)) as sql --审计sql
from sys.aud$ t
left join sys.AUDIT_ACTIONS b
on t.action# = b.action;
2.编写python,将数据库中审计日志读取到磁盘上
#创建日志目录
su - oracle
mkdir /u01/app/oracle/admin/orcl/audit
vim check_audit.py
#!/usr/bin/python
#coding=utf-8
import os
import cx_Oracle
os.environ['ORACLE_HOME'] = '/u01/app/oracle/product/11.2.0/db_1'
os.environ['ORACLE_SID'] = 'orcl'
os.environ['PATH']
#连接数据库,查视图
def check_audit_qurey():
conn = cx_Oracle.connect('sys','*******','192.168.1.96:14320/orcl',mode=cx_Oracle.SYSDBA)
cursor=conn.cursor()
lists = []
try:
cursor.execute ("select * from audit_sql_vw")
#print("连接成功!")
lists = cursor.fetchall()
except Exception:
print("connenct oracle error,dblink error!",Exception)
finally:
#每次运行脚本后,清空数据库中审计日志,防止占用空间太大
cursor.execute ("truncate table sys.aud$")
cursor.close()
conn.close()
msg = ''
if len(lists):
for i in lists:
msg = msg + ",".join(map(str, i)) + '\n'
#print(msg)
#将文件输出到文件
f=open('/u01/app/oracle/admin/orcl/audit/audit.log','w+')
f.write(msg)
f.close()
def main():
check_audit_qurey()
if __name__ == "__main__":
main()
3.每半小时执行一次脚本
crontab -e
*/30 * * * * /usr/bin/python /u01/app/oracle/admin/orcl/audit/check_audit.py > /u01/app/oracle/admin/orcl/audit/exec_check_audit.log 2>&1
4.安装filebeat读取oracle审计日志
网上下载filebeat的rpm包
rpm -ivh filebeat-7.12.0-x86_64.rpm
warning: filebeat-7.12.0-x86_64.rpm: Header V4 RSA/SHA512 Signature, key ID d88e42b4: NOKEY
Preparing... ########################################### [100%]
1:filebeat ########################################### [100%]
#修改配置文件
vim /etc/filebeat/filebeat.yml
filebeat.inputs:
#oracle_audit.log 收集审计SQL(每半小时会自动触发一次)
- type: log
enabled: true
paths:
- /u01/app/oracle/admin/orcl/audit/audit.log
tags: ["oracle-audit-log"]
multiline.pattern: '\d{4}-\d{2}-\d{2}'
multiline.negate: true
multiline.match: after
fields:
log_source: oracle-audit-log
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 3
setup.kibana:
output.logstash:
hosts: ["192.168.1.207:55051"]
#启动filebeat
service filebeat start
service filebeat status
service filebeat stop
#日志查看
tail -f /var/log/filebeat/filebeat
5.创建logstash配置文件,重启远程logstash
#配置logstash内置正则字段
vim /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns
添加
ORACLE_TIME (Mon|Tue|Wed|Thu|Fri|Sat|Sun)\s(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s(\d{2})\s(\d{2}):(\d{2}):(\d{2})\s(\d{4})
DB_LOGCONTENT .*
SQL_ID [A-Za-z0-9]+
ORA_MACHINE .*?
ORA_USER .*?
ORA_TYPE .*?
ORA_OBJECT .*?
ORA_SQL .*
logstash配置文件创建
vim /etc/logstash/conf.d/oracle-log.conf
vim /etc/logstash/conf.d/oracle-log.conf
input {
beats {
port => 55051
}
}
filter{
if "oracle-log" in [tags] {
mutate { add_field => { "clienthost" => "%{[beat][hostname]}" } }
grok {
match => {
"message" => "%{ORACLE_TIME:datetime}\n%{DB_LOGCONTENT:logmessage}"
}
}
}
if "oracle-slow-log" in [tags] {
grok {
match => [ "message" , "%{TIMESTAMP_ISO8601:time},%{SQL_ID:sql_id},%{NUMBER:elapsed_time:float},%{NUMBER:cpu_time:float},%{NUMBER:iowait_time:float},%{NUMBER:gets:int},%{NUMBER:reads:int},%{NUMBER:rows:int},%{NUMBER:cluster_wait_time:float},%{NUMBER:execs:int},%{NUMBER:elpe_time:float},%{ORA_MACHINE:machine},%{ORA_USER:username},%{ORA_SQL:sql}" ]
}
date{
match=>["time","YYYY-MM-dd HH:mm:ss"]
target=>"snapshot_time"
}
}
if "oracle-audit-log" in [tags] {
grok {
match => [ "message" , "%{TIMESTAMP_ISO8601:time},%{ORA_USER:user_name},%{ORA_MACHINE:machine},%{ORA_TYPE:sql_type},%{ORA_OBJECT:object_name},%{NUMBER:return_code:int},%{ORA_SQL:sql}" ]
}
date{
match=>["time","YYYY-MM-dd HH:mm:ss"]
target=>"audit_time"
}
}
}
output {
if "oracle-log" in [tags] {
elasticsearch {
hosts => ["192.168.1.207:9200"]
manage_template => false
index => "oracle-log-%{+YYYY.MM}"
}}
if "oracle-slow-log" in [tags] {
elasticsearch {
hosts => ["192.168.1.207:9200"]
manage_template => false
index => "oracle-slow-log-%{+YYYY.MM}"
}
}
if "oracle-audit-log" in [tags] {
elasticsearch {
hosts => ["192.168.1.207:9200"]
manage_template => false
index => "oracle-audit-log-%{+YYYY.MM}"
}
}
}
#重启logstash
systemctl restart logstash
6.kabana创建索引,查看审计状态
kabana创建索引
还没有评论,来说两句吧...