ELK收集Oracle审计日志

迈不过友情╰ 2023-01-17 03:22 271阅读 0赞

一、开启审计,记录sql

  1. --Oracle审计
  2. https://max.book118.com/html/2016/1214/72170219.shtm
  3. --开启审计
  4. alter system set audit_sys_operations=TRUE scope=spfile;
  5. alter system set audit_trail=db,extended scope=spfile;
  6. --重启实例
  7. shutdown immediate
  8. startup
  9. --查看审计状态
  10. SYS@orcl>show parameter audit;
  11. NAME TYPE VALUE
  12. ------------------------------------ ----------- ------------------------------
  13. audit_file_dest string /u01/app/oracle/admin/orcl/adump
  14. audit_sys_operations boolean TRUE
  15. audit_syslog_level string
  16. audit_trail string DB, EXTENDED

二、迁移审计表空间

  1. --迁移审计表到ogg的表空间
  2. BEGIN
  3. DBMS_AUDIT_MGMT.set_audit_trail_location(
  4. audit_trail_type => DBMS_AUDIT_MGMT.AUDIT_TRAIL_AUD_STD,
  5. audit_trail_location_value => 'OGG');
  6. END;
  7. /
  8. BEGIN
  9. DBMS_AUDIT_MGMT.set_audit_trail_location(
  10. audit_trail_type => DBMS_AUDIT_MGMT.AUDIT_TRAIL_FGA_STD,
  11. audit_trail_location_value => 'OGG');
  12. END;
  13. /
  14. --验证表空间
  15. SELECT table_name, tablespace_name FROM dba_tables WHERE table_name IN ('AUD$', 'FGA_LOG$');
  16. select TABLE_NAME, SEGMENT_NAME, TABLESPACE_NAME from dba_lobs where table_name in ('AUD$', 'FGA_LOG$');
  17. --审计段大小统计
  18. SELECT owner
  19. ,table_name
  20. ,SUM(decode(seg_type, 'table', size_mb)) tab_size_mb
  21. ,SUM(decode(seg_type, 'index', size_mb)) idx_size_mb
  22. ,SUM(decode(seg_type, 'lob', size_mb)) lob_size_mb
  23. ,SUM(size_mb)
  24. FROM (SELECT /*+ rule */ t.owner
  25. ,t.table_name
  26. ,SUM(s.bytes) / 1024 / 1024 size_mb
  27. ,'table' seg_type
  28. FROM dba_segments s, dba_tables t
  29. WHERE s.owner = t.owner
  30. AND s.segment_name = t.table_name
  31. GROUP BY t.owner, t.table_name
  32. UNION ALL
  33. SELECT /*+ rule */ l.owner
  34. ,l.table_name
  35. ,SUM(s.bytes) / 1024 / 1024 size_mb
  36. ,'lob' seg_type
  37. FROM dba_segments s, dba_lobs l
  38. WHERE s.owner = l.owner
  39. AND s.segment_name = l.segment_name
  40. GROUP BY l.owner, l.table_name
  41. UNION ALL
  42. SELECT /*+ rule */ i.table_owner owner
  43. ,i.table_name
  44. ,SUM(s.bytes) / 1024 / 1024 size_mb
  45. ,'index' seg_type
  46. FROM dba_segments s, dba_indexes i
  47. WHERE s.owner = i.owner
  48. AND s.segment_name = i.index_name
  49. GROUP BY i.table_owner, i.table_name)
  50. WHERE owner = 'SYS'
  51. AND table_name = 'AUD$'
  52. group by owner, table_name;

Oracle审计相关内容参考:

https://blog.csdn.net/demonson/article/details/116146370

三、根据需求设计审计规则

  1. 需求:
  2. 要求ELK收集现有数据库所有sql语句,但由于DML语句过于频繁,只收集数据库所有DDL语句(不管语句是否执行成功,都收集上);

根据以上需求和上面oracle审计技术文档,判断为语句级审计满足需求: 需要审计项目:

" class="reference-link">watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2RlbW9uc29u_size_16_color_FFFFFF_t_70

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2RlbW9uc29u_size_16_color_FFFFFF_t_70 1

  1. 先清空oracle默认开启的审计项
  2. select * from sys.audit$;
  3. --sys用户登录
  4. truncate table audit$;

在执行下面语句级审计DDL

  1. 审计语句:
  2. AUDIT DATABASE LINK;
  3. AUDIT INDEX;
  4. AUDIT PROCEDURE;
  5. AUDIT SEQUENCE;
  6. AUDIT SYSTEM GRANT;
  7. AUDIT TABLE;
  8. AUDIT TABLESPACE;
  9. AUDIT TRIGGER;
  10. AUDIT USER;
  11. AUDIT TYPE;
  12. AUDIT VIEW;
  13. AUDIT ALTER SEQUENCE;
  14. AUDIT ALTER TABLE;

重启数据库

  1. shut immediate
  2. startup

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2RlbW9uc29u_size_16_color_FFFFFF_t_70 2

  1. 涉及到的SQL:
  2. select * from sys.audit$;
  3. select * from sys.aud$;
  4. select * from dba_priv_audit_opts;
  5. select * from dba_stmt_audit_opts;
  6. select * from sys.AUDIT$ a inner join sys.STMT_AUDIT_OPTION_MAP b on a.option#=b.OPTION#;
  7. select userid,userhost, to_char(substr(SQLTEXT,1,3000)) as sql,action#,returncode,to_char(NTIMESTAMP#,'yyyy-mm-dd hh24:mi:ss'),
  8. SQLBIND,SQLTEXT from sys.aud$ t where userid='GGS'order by t.ntimestamp# desc;
  9. 取消审计语句:
  10. NOAUDIT DATABASE LINK;
  11. NOAUDIT INDEX;
  12. NOAUDIT PROCEDURE;
  13. NOAUDIT SEQUENCE;
  14. NOAUDIT SYSTEM GRANT;
  15. NOAUDIT TABLE;
  16. NOAUDIT TABLESPACE;
  17. NOAUDIT TRIGGER;
  18. NOAUDIT USER;
  19. NOAUDIT TYPE;
  20. NOAUDIT VIEW;
  21. NOAUDIT ALTER SEQUENCE;
  22. NOAUDIT ALTER TABLE;
  23. 取消所有审计语句:
  24. NOAUDIT ALL;

#取消审计等操作若是不生效,测试重新配置审计打开并重启数据库能有效解决

四、ELK抓取审计语句

1.sys用户下创建视图

  1. create or replace view audit_sql_vw as
  2. select to_char(t.NTIMESTAMP# + 1 / 3, 'yyyy-mm-dd hh24:mi:ss') auditime, --审计时间
  3. t.userid as username, --用户名称
  4. t.userhost as hostname, --客户端主机名
  5. b.name as sqltype, --语句类别
  6. t.obj$name as objname, --对象名称
  7. t.returncode, --返回值,0表示执行成功
  8. to_char(substr(t.SQLTEXT, 1, 3000)) as sql --审计sql
  9. from sys.aud$ t
  10. left join sys.AUDIT_ACTIONS b
  11. on t.action# = b.action;

2.编写python,将数据库中审计日志读取到磁盘上

  1. #创建日志目录
  2. su - oracle
  3. mkdir /u01/app/oracle/admin/orcl/audit
  4. vim check_audit.py
  5. #!/usr/bin/python
  6. #coding=utf-8
  7. import os
  8. import cx_Oracle
  9. os.environ['ORACLE_HOME'] = '/u01/app/oracle/product/11.2.0/db_1'
  10. os.environ['ORACLE_SID'] = 'orcl'
  11. os.environ['PATH']
  12. #连接数据库,查视图
  13. def check_audit_qurey():
  14. conn = cx_Oracle.connect('sys','*******','192.168.1.96:14320/orcl',mode=cx_Oracle.SYSDBA)
  15. cursor=conn.cursor()
  16. lists = []
  17. try:
  18. cursor.execute ("select * from audit_sql_vw")
  19. #print("连接成功!")
  20. lists = cursor.fetchall()
  21. except Exception:
  22. print("connenct oracle error,dblink error!",Exception)
  23. finally:
  24. #每次运行脚本后,清空数据库中审计日志,防止占用空间太大
  25. cursor.execute ("truncate table sys.aud$")
  26. cursor.close()
  27. conn.close()
  28. msg = ''
  29. if len(lists):
  30. for i in lists:
  31. msg = msg + ",".join(map(str, i)) + '\n'
  32. #print(msg)
  33. #将文件输出到文件
  34. f=open('/u01/app/oracle/admin/orcl/audit/audit.log','w+')
  35. f.write(msg)
  36. f.close()
  37. def main():
  38. check_audit_qurey()
  39. if __name__ == "__main__":
  40. main()

3.每半小时执行一次脚本

  1. crontab -e
  2. */30 * * * * /usr/bin/python /u01/app/oracle/admin/orcl/audit/check_audit.py > /u01/app/oracle/admin/orcl/audit/exec_check_audit.log 2>&1

4.安装filebeat读取oracle审计日志

  1. 网上下载filebeatrpm
  2. rpm -ivh filebeat-7.12.0-x86_64.rpm
  3. warning: filebeat-7.12.0-x86_64.rpm: Header V4 RSA/SHA512 Signature, key ID d88e42b4: NOKEY
  4. Preparing... ########################################### [100%]
  5. 1:filebeat ########################################### [100%]
  6. #修改配置文件
  7. vim /etc/filebeat/filebeat.yml
  8. filebeat.inputs:
  9. #oracle_audit.log 收集审计SQL(每半小时会自动触发一次)
  10. - type: log
  11. enabled: true
  12. paths:
  13. - /u01/app/oracle/admin/orcl/audit/audit.log
  14. tags: ["oracle-audit-log"]
  15. multiline.pattern: '\d{4}-\d{2}-\d{2}'
  16. multiline.negate: true
  17. multiline.match: after
  18. fields:
  19. log_source: oracle-audit-log
  20. filebeat.config.modules:
  21. path: ${path.config}/modules.d/*.yml
  22. reload.enabled: false
  23. setup.template.settings:
  24. index.number_of_shards: 3
  25. setup.kibana:
  26. output.logstash:
  27. hosts: ["192.168.1.207:55051"]
  28. #启动filebeat
  29. service filebeat start
  30. service filebeat status
  31. service filebeat stop
  32. #日志查看
  33. tail -f /var/log/filebeat/filebeat

5.创建logstash配置文件,重启远程logstash

  1. #配置logstash内置正则字段
  2. vim /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns
  3. 添加
  4. ORACLE_TIME (Mon|Tue|Wed|Thu|Fri|Sat|Sun)\s(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s(\d{2})\s(\d{2}):(\d{2}):(\d{2})\s(\d{4})
  5. DB_LOGCONTENT .*
  6. SQL_ID [A-Za-z0-9]+
  7. ORA_MACHINE .*?
  8. ORA_USER .*?
  9. ORA_TYPE .*?
  10. ORA_OBJECT .*?
  11. ORA_SQL .*
  12. logstash配置文件创建
  13. vim /etc/logstash/conf.d/oracle-log.conf
  14. vim /etc/logstash/conf.d/oracle-log.conf
  15. input {
  16. beats {
  17. port => 55051
  18. }
  19. }
  20. filter{
  21. if "oracle-log" in [tags] {
  22. mutate { add_field => { "clienthost" => "%{[beat][hostname]}" } }
  23. grok {
  24. match => {
  25. "message" => "%{ORACLE_TIME:datetime}\n%{DB_LOGCONTENT:logmessage}"
  26. }
  27. }
  28. }
  29. if "oracle-slow-log" in [tags] {
  30. grok {
  31. match => [ "message" , "%{TIMESTAMP_ISO8601:time},%{SQL_ID:sql_id},%{NUMBER:elapsed_time:float},%{NUMBER:cpu_time:float},%{NUMBER:iowait_time:float},%{NUMBER:gets:int},%{NUMBER:reads:int},%{NUMBER:rows:int},%{NUMBER:cluster_wait_time:float},%{NUMBER:execs:int},%{NUMBER:elpe_time:float},%{ORA_MACHINE:machine},%{ORA_USER:username},%{ORA_SQL:sql}" ]
  32. }
  33. date{
  34. match=>["time","YYYY-MM-dd HH:mm:ss"]
  35. target=>"snapshot_time"
  36. }
  37. }
  38. if "oracle-audit-log" in [tags] {
  39. grok {
  40. match => [ "message" , "%{TIMESTAMP_ISO8601:time},%{ORA_USER:user_name},%{ORA_MACHINE:machine},%{ORA_TYPE:sql_type},%{ORA_OBJECT:object_name},%{NUMBER:return_code:int},%{ORA_SQL:sql}" ]
  41. }
  42. date{
  43. match=>["time","YYYY-MM-dd HH:mm:ss"]
  44. target=>"audit_time"
  45. }
  46. }
  47. }
  48. output {
  49. if "oracle-log" in [tags] {
  50. elasticsearch {
  51. hosts => ["192.168.1.207:9200"]
  52. manage_template => false
  53. index => "oracle-log-%{+YYYY.MM}"
  54. }}
  55. if "oracle-slow-log" in [tags] {
  56. elasticsearch {
  57. hosts => ["192.168.1.207:9200"]
  58. manage_template => false
  59. index => "oracle-slow-log-%{+YYYY.MM}"
  60. }
  61. }
  62. if "oracle-audit-log" in [tags] {
  63. elasticsearch {
  64. hosts => ["192.168.1.207:9200"]
  65. manage_template => false
  66. index => "oracle-audit-log-%{+YYYY.MM}"
  67. }
  68. }
  69. }
  70. #重启logstash
  71. systemctl restart logstash

6.kabana创建索引,查看审计状态

kabana创建索引

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2RlbW9uc29u_size_16_color_FFFFFF_t_70 3

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2RlbW9uc29u_size_16_color_FFFFFF_t_70 4

发表评论

表情:
评论列表 (有 0 条评论,271人围观)

还没有评论,来说两句吧...

相关阅读