自动监测linux服务器用户登录,并发送告警消息到微信


需要监测的主机(客户端):

[root@rsyslog-server ~]# cat login.sh
#!/bin/bash

# 监听登录事件
tail -n 0 -F /var/log/secure | while read LINE; do
    # 检查是否有用户登录事件
    if echo "$LINE" | grep -q "Accepted password"; then
        # 提取用户名和登录IP
        USERNAME=$(echo "$LINE" | awk '{print $9}')
        IP=$(echo "$LINE" | awk '{print $11}')
        HOSTNAME=$(echo "$LINE" | awk '{print $4}')
        json="{\"hostname\": \"${HOSTNAME}\", \"username\": \"${USERNAME}\", \"ipaddress\": \"${IP}\", \"Original\": \"${LINE}\"}"
        #echo "$json" >> /tmp/login.log
        #采用调用api的方式将信息发送到服务端。
        curl -X POST -H "Authorization: Basic dXNlcm5hbWU6cGFzc3dvcmQ=" -H "Content-Type: application/json" -d "${json}" http://192.168.100.113:5000/api/receive_json
    fi
done

将脚本加入开机自启中(客户端)

[root@rsyslog-server ~]# crontab -l
#rsyn time from aliyun ntpdate
*/5 * * * * /usr/sbin/ntpdate ntp1.aliyun.com &>/dev/null
@reboot /root/login.sh

[root@rsyslog-server ~]# ll login.sh
-rwxr-xr-x 1 root root 748 Aug 29 15:35 login.sh
[root@rsyslog-server ~]# pwd
/root

api接口调用(服务端):

python程序

cat main.py
from flask import Flask, request, jsonify
from flask_basicauth import BasicAuth
from send_message import SendMessage

app = Flask(__name__)
app.config['BASIC_AUTH_USERNAME'] = 'username'  # 设置用户名
app.config['BASIC_AUTH_PASSWORD'] = 'password'  # 设置密码

basic_auth = BasicAuth(app)

@app.route('/api/receive_json', methods=['POST'])
@basic_auth.required
def receive_json():
    try:
        data = request.json  # 获取客户端发送的 JSON 数据
        print("Received JSON data:", data)
        #执行发送消息等操作!此处为发送微信公众号消息
        sm = SendMessage()
        sm.send_message(json_data=data)

        return jsonify({"message": "JSON data received successfully"})
    except Exception as e:
        return jsonify({"error": str(e)})

if __name__ == '__main__':
    app.run(host='0.0.0.0',debug=True)
cat  access_token.py

#!/usr/bin/python
# -*- coding: utf-8 -*-
import requests
class AccessToken(object):
    # 微信公众测试号账号(填写自己的)
    APPID = "wxf9ade4263600axxx"
    # 微信公众测试号密钥(填写自己的)
    APPSECRET = "51e57900e66ff46fd477959bd39c4xxx"

    def __init__(self, app_id=APPID, app_secret=APPSECRET) -> None:
        self.app_id = app_id
        self.app_secret = app_secret

    def get_access_token(self) -> str:
        """
        获取access_token凭证
        :return: access_token
        """
        url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={self.app_id}&secret={self.app_secret}"
        resp = requests.get(url)
        result = resp.json()
        if 'access_token' in result:
            return result["access_token"]
        else:
            print(result)
cat send_message.py

#!/usr/bin/python
# -*- coding: utf-8 -*-

import json
import requests
from access_token import AccessToken

class SendMessage(object):
    # 消息接收者
    TOUSER = 'oaf5S6QbCO8r0VJcZgdY2ijvxxx'
    # 消息模板id
    TEMPLATE_ID = 'zVa3jDVubNPAVDhs1O1QOZJ4KZWDpT5qZUKkeY6Axxx'
    # 点击跳转链接(可无)
    CLICK_URL = 'https://www.cqgyd.com'

    def __init__(self, touser=TOUSER, template_id=TEMPLATE_ID, click_url=CLICK_URL) -> None:
        """
        构造函数
        :param touser: 消息接收者
        :param template_id: 消息模板id
        :param click_url: 点击跳转链接(可无)
        """
        self.access_token = AccessToken().get_access_token()
        self.touser = touser
        self.template_id = template_id
        self.click_url = click_url

    def get_send_data(self, json_data) -> object:
        """
        获取发送消息data
        :param json_data: json数据对应模板
        :return: 发送的消息体
        """
        return {
            "touser": self.touser,
            "template_id": self.template_id,
            "url": self.click_url,
            "topcolor": "#FF0000",
            # json数据对应模板
            "data": {
                "hostname": {
                    "value": json_data["hostname"],
                    # 字体颜色
                    "color": "#173177"
                },
                "username": {
                    "value": json_data["username"],
                    # 字体颜色
                    "color": "#173177"
                },
                "ipaddress": {
                    "value": json_data["ipaddress"],
                    "color": "#173177"
                },
                "Original": {
                    "value": json_data["Original"],
                    "color": "#173177"
                },
            }
        }

    def send_message(self, json_data) -> None:
        """
        发送消息
        :param json_data: json数据
        :return:
        """
        # 模板消息请求地址
        url = f"https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={self.access_token}"
        data = json.dumps(self.get_send_data(json_data))
        print(data)
        resp = requests.post(url, data=data)
        result = resp.json()
        if result["errcode"] == 0:
            print("消息发送成功")
        else:
            print(result)

postman模拟:

curl --location 'http://192.168.100.113:5000/api/receive_json' \
--header 'Content-Type: application/json' \
--header 'Authorization: Basic dXNlcm5hbWU6cGFzc3dvcmQ=' \
--data '{"hostname":"cqds-db-mysql01","username":"root","ipaddress":"222.182.188.236","Original":"Aug 29 14:31:31 cqds-db-mysql01 sshd[12838]: Accepted password for root from 222.182.188.236 port 6091 ssh2"}'

声明:鹅石壳儿|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - 自动监测linux服务器用户登录,并发送告警消息到微信


Carpe Diem and Do what I like