权限选择 VPC Administrator 与 Tenant Administrator 两个权限
授权范围选择所有资源,或跟你需要自己配置
点击完成即可。
3.1.4 填写代码创建过函数后,会进入函数编辑页面,将下面的代码写到编辑器里,点击下部署,或键盘按 Ctrl S 进行部署
# -*- coding:utf-8 -*-
import time
from urllib.parse import unquote_plus
from elasticsearch import Elasticsearch
from obs import ObsClient
def handler(event, context):
# 获取桶名与对象名
region_id, bucket_name, object_name = get_obs_obj_info(event.get("Records", None)[0])
context.getLogger().info(f"bucket name: {bucket_name}, object key: {object_name}")
ak = context.getAccessKey()
sk = context.getSecretKey()
server = 'obs.' region_id '.myhuaweicloud.com'
context.getLogger().info("before token")
context.getLogger().info(context.getToken())
context.getLogger().info("finish token")
obs_client = ObsClient(access_key_id=ak, secret_access_key=sk, server=server)
# 获取对象元数据
object_metadata = obs_client.getObjectMetadata(bucket_name, object_name)
# 将头域转为字典
info_dict = {i[0]: i[1] for i in object_metadata["header"]}
info_dict["bucket_name"] = bucket_name
info_dict["object_name"] = object_name
# 为了不同系统下时区转换导致时间不统一,这里不使用 OBS 里的 last-modified 的 GMT 时间,改用时间戳
info_dict["create_time"] = int(time.time())
# 把对象大小转为数字格式
info_dict["content-length"] = int(info_dict["content-length"])
# 去除部分无用的 header
for i in ["id-2", "request-id", "connection", "last-modified", "uploadid"]:
if i in info_dict:
info_dict.pop(i)
# 把其他算子里包含的信息也一起保存下来
if "other_info" in event["dynamic_source"]:
info_dict.update(event["dynamic_source"]["other_info"])
context.getLogger().info(f"metadata to save: {info_dict}")
es_user = event["dynamic_source"]["es_user"]
es_password = event["dynamic_source"]["es_password"]
es_server_ip = event["dynamic_source"]["es_server"]
es_port = event["dynamic_source"]["es_port"]
context.getLogger().info(es_port)
if es_user != "" and es_password != "":
es_server = f"https://{es_user}:{es_password}@{es_server_ip}:{es_port}"
context.getLogger().info(es_server.replace(es_password, "xxxxxxx"))
else:
es_server = f"http://{es_server_ip}:{es_port}"
context.getLogger().info(es_server)
es = Elasticsearch([es_server], ca_certs=False, verify_certs=False)
response = es.index(index=bucket_name, body=info_dict)
context.getLogger().info(response)
return {
"statusCode": 200,
"isBase64Encoded": False,
"body": response,
"headers": {
"Content-Type": "application/Json"
}
}
def get_obs_obj_info(record):
if 's3' in record:
s3 = record['s3']
return record["eventRegion"], s3['bucket']['name'], unquote_plus(s3['object']['key'])
else:
obs_info = record['obs']
return record["eventRegion"], obs_info['bucket']['name'], \
unquote_plus(obs_info['object']['key'])
3.1.5 配置函数
1.配置依赖 在代码配置页最下找到添加依赖包按钮,分别添加公共依赖中的OBS 3.21.8 与 私有依赖中的fgpackage