ITSM 流程中自动化对接 JumpServer 的实战经验
背景
最近在搞 ITSM 流程,其中有一个流程是关于堡垒机(JumpServer)权限申请的,主要设计账号注册和资产授权两个功能,这个流程需要实现自动化对接,也就是说用户提交申请之后可以自动注册账号或完成资产授权。自动化对接的本质就是接口调用,本文记录一下我使用 Python 封装的一些接口操作。
JumpServer 接口文档
官方关于文档的描述可以查看这个:https://docs.jumpserver.org/zh/v4/dev/rest_api/
具体的接口文档需要在 JumpServer 平台访问,访问地址为 http://<url>/api/docs/
,JumpServer 使用的 Django 开发的,接口文档是 DRF 提供的,效果如图:
官方提供了几种认证方式及调用示例:
吐槽一下
这里官方给的例子是让安装 drf-httpsig
库,实际上根本不需要,只需要安装 httpsig
就行,不然会多好几个不使用的库,包括 django
。
实战经验
接口请求封装
我这边对接的平台提供的认证方式是使用 AccessKeyID
和 AccessKeySecret
认证,下面是参考官方用例和实际场景进行的接口封装:
python
# -*- coding:utf-8 -*-
import datetime
import json
import logging
import sys
import time
import requests
from httpsig.requests_auth import HTTPSignatureAuth
if sys.version_info[0] == 2:
from imp import reload
reload(sys)
sys.setdefaultencoding('utf8')
FORMAT = '[%(asctime)s (line:%(lineno)d) %(levelname)s] %(message)s'
logging.basicConfig(level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S', format=FORMAT)
logger = logging.getLogger(__name__)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
def print_json(_dict, indent=4):
print(json.dumps(_dict, indent=indent, ensure_ascii=False))
class JumpServer:
def __init__(self, jms_url, jms_org, ak, sk):
self.jms_url = jms_url
self.jms_org = jms_org
self.ak = ak
self.sk = sk
self.auth = HTTPSignatureAuth(
key_id=self.ak,
secret=self.sk,
algorithm='hmac-sha256',
headers=['(request-target)', 'accept', 'date']
)
self.root_node = self.get_root_node()
def get_root_node(self):
"""
返回根节点信息
:return:
"""
_, data = self.get_nodes({'key': '1'})
return data[0]
def request(self, api, method, org=None, **kwargs):
url = self.jms_url + api
org = org or self.jms_org
default_headers = {
'Accept': 'application/json',
'Date': datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
}
if org:
default_headers['X-JMS-ORG'] = org
merged_headers = default_headers.copy()
if kwargs.get('headers'):
merged_headers.update(kwargs['headers'])
kwargs['headers'] = merged_headers
kwargs['timeout'] = kwargs.get('timeout', 10)
resp = requests.request(method=method, url=url, auth=self.auth, **kwargs)
if resp.status_code < 400:
return resp.status_code, resp.json()
else:
logger.error('Request failed: {} {}'.format(resp.status_code, resp.text))
return resp.status_code, {}
def get_users(self, params=None):
"""
查询用户列表
:return:
"""
api = '/api/v1/users/users/'
method = 'GET'
return self.request(api, method, params=params)
def get_nodes(self, params=None):
"""
获取节点
:return:
"""
api = '/api/v1/assets/nodes/'
method = 'GET'
return self.request(api, method, params=params)
def get_assets(self, params=None):
"""
获取资产列表
:return:
"""
api = '/api/v1/assets/assets/'
method = 'GET'
return self.request(api, method, params=params)
def create_user(self, post_data):
"""
创建用户
{
'name': 'itsm02',
'username': 'itsm02',
'email': 'itsm02@abcd.xyz',
'system_roles': [],
'source': 'local',
'mfa_level': 0
}
:param post_data:
:return:
"""
api = '/api/v1/users/users/'
method = 'POST'
return self.request(api, method, json=post_data)
def create_node(self, node_id, post_data):
"""
创建子节点
:param node_id:
:param post_data:
:return:
"""
api = '/api/v1/assets/nodes/{id}/children/'.format(id=node_id)
method = 'POST'
return self.request(api, method, json=post_data)
def create_asset(self, post_data):
"""
创建资产
:param post_data:
:return:
"""
api = '/api/v1/assets/assets/'
method = 'POST'
return self.request(api, method, json=post_data)
def asset_permissions(self, post_data):
api = '/api/v1/perms/asset-permissions/'
method = 'POST'
return self.request(api, method, json=post_data)
def get_asset_id(self, node_value, host_data):
"""
通过主机信息获取一个资产ID,如果资产不存在则创建资产
:param node_value:
:param host_data: 主机信息,比如包括三个字段hostname,ip,platform
:return:
"""
_, assets = self.get_assets({'ip': host_data['ip']})
if assets:
asset_id = assets[0]['id']
else:
node_id = self.get_node_id(node_value)
post_data = {
'hostname': host_data['hostname'],
'ip': host_data['ip'],
'platform': host_data['platform'],
'nodes': [node_id]
}
_, asset_data = self.create_asset(post_data)
asset_id = asset_data['id']
return asset_id
def get_node_id(self, node_value):
"""
通过节点value返回节点ID,不存在就创建一个
:param node_value:
:return:
"""
_, nodes = self.get_nodes({'value': node_value})
if nodes:
node_id = nodes[0]['id']
else:
post_data = {
'value': node_value,
'full_value': '{}/{}'.format(self.root_node['full_value'], node_value)
}
_, node_data = self.create_node(self.root_node['id'], post_data)
node_id = node_data['id']
return node_id
下面是实例化的一些必要参数配置:
python
if __name__ == "__main__":
if local_ip.startswith('192.168.92.'):
JMS_URL = 'http://192.168.92.166'
JMS_ORG = ''
JMS_AK = '54695c02-fe15-45fd-a722-7ae102248f6b'
JMS_SK = 'c8f827ac-9361-4323-a972-5d7bc6914682'
SYSTEM_ROLES = []
MFA_LEVEL = 0
READ_USER_IDS = ['e8981c93-98dc-4112-ba96-77a4e19ac3a6']
OPS_USER_IDS = ['416a3b96-f119-4f7b-b0e3-ad4a7b958eab']
else:
JMS_URL = ''
JMS_ORG = ''
JMS_AK = ''
JMS_SK = ''
SYSTEM_ROLES = []
MFA_LEVEL = 0
READ_USER_IDS = []
OPS_USER_IDS = []
JMS_API = JumpServer(JMS_URL, JMS_ORG, JMS_AK, JMS_SK)
使用场景
1. 采集堡垒机账号
由于流程中在授权的时候需要用户选择授权账号,为了减少每次选择的时候重新查询堡垒机,于是我提前会定义同步堡垒机账号信息。
python
def update_users():
"""
更新用户到CMDB
:return:
"""
_, data = JMS_API.get_users()
datas = []
for each in data:
datas.append({
'id': each['id'],
'name': each['name'],
'username': each['username'],
'email': each['email'],
'source': each['source'],
'is_valid': each['is_valid'],
'date_expired': each['date_expired'],
})
logger.info('账号数量:{}'.format(len(datas)))
# 存储方式略
2. 创建账号
python
def create_users(user_infos):
"""
创建用户
:param user_infos:
:return:
"""
for user_info in user_infos:
source = user_info['source']['value'] # ldap|local
name = user_info['name']
username = user_info['username']
email = user_info['email']
post_data = {
'name': name,
'username': username,
'email': email,
'system_roles': SYSTEM_ROLES,
'source': source,
'mfa_level': MFA_LEVEL
}
print_json(post_data)
logger.info('开始创建账号:{}'.format(name))
_, data = JMS_API.create_user(post_data)
print_json(data)
3. 资产授权
资产授权的逻辑相对复杂一点,因为需要判断用户选择的主机是否存在资产,主要是有以下几种可能性及需要做的事情:
- 主机有对应的资产:
- 资产授权
- 主机没有对应的资产:
- 有系统命名的节点:
- 创建资产
- 资产授权
- 没有系统命名的节点:
- 创建节点
- 创建资产
- 资产授权
- 有系统命名的节点:
python
def date2date(from_date):
"""
将表单中的时间区间转换成堡垒机的开始和结束时间格式
:param from_date: ["2024-12-18T14:11:40+08:00", "2025-01-23T14:11:40+08:00"]
:return: ["2024/12/18 00:00:00 +0800", "2025/01/23 23:59:59 +0800"]
"""
start_date = '{} 00:00:00 +0800'.format(from_date[0][:10].replace('-', '/'))
end_date = '{} 23:59:59 +0800'.format(from_date[1][:10].replace('-', '/'))
return [start_date, end_date]
def asset_to_user(table_info, order_num):
"""
将主机授权给堡垒机用户
可能的情况:
1. 主机有对应的资产:
资产授权
2. 主机没有对应的资产:
2.1 有系统命名的节点:
创建资产
资产授权
2.2 没有系统命名的节点:
创建节点
创建资产
资产授权
:param table_info: 表单信息
:param order_num: 工单编号拿来命名
:return:
"""
date_str = datetime.datetime.now().strftime('%Y%m%d')
for _index, each in enumerate(table_info):
system_name = each['system'][0]['name']
host_datas = []
for host in each['hosts']:
host_datas.append({
'ip': host['ip'],
'hostname': host['hostname'],
'platform': host['osSystem']
})
user_ids = [u['id'] for u in each['users']]
os_user_type = each['hostUserType']['value'] # a:运维 b:只读
if os_user_type == 'a':
os_user_ids = OPS_USER_IDS
date_start, date_expired = date2date(each['opsDate'])
else:
os_user_ids = READ_USER_IDS
if each.get('readDate'):
date_start, date_expired = date2date(each['readDate'])
else:
date_start, date_expired = ['', '']
# 先创建或直接返回主机对应的资产ID
asset_ids = []
for host_data in host_datas:
asset_id = JMS_API.get_asset_id(system_name, host_data)
asset_ids.append(asset_id)
# 发送资产授权请求
name = '{}_{}_{}_{}'.format(system_name, order_num, date_str, _index)
post_data = {
'name': name,
'assets': asset_ids,
'users': user_ids,
'system_users': os_user_ids,
}
if date_start and date_expired:
post_data['date_start'] = date_start
post_data['date_expired'] = date_expired
print_json(post_data)
logger.info('开始创建资产授权:{}'.format(name))
_, data = JMS_API.asset_permissions(post_data)
print_json(data)