1.在Azure 门户注册应用程序
微软文档地址
重定向的地址配置(微软地址): https://login.microsoftonline.com/common/oauth2/nativeclient
注册应用地址
2.程序代码
#安装包以及需要的驱动
pip3 install playwrightplaywright install
import base64
import json
import logging
from io import BytesIOfrom playwright.sync_api import sync_playwright
import time
import urllib.parse
import requestsfrom pia.utils.cache import get_redis_value, set_redis_expire
from pia.utils.constants import OUTLOOK_CLIENT_ID, OUTLOOK_TENANT_ID, OUTLOOK_CLIENT_SECRET, OUTLOOK_REDIRECT_URI, \COS_OUTLOOK_DIR, OUTLOOK_TOP
from pia.utils.cos_upload import upload_stream_to_cos, check_exists
from pia.utils.reids_key import OUTLOOK_TOKENlog = logging.getLogger(__name__)#账号登录 官方文档 https://learn.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow
def get_authorization_code(_user_name, _pass_word):with sync_playwright() as play_wright:browser = play_wright.chromium.launch(headless=True)context = browser.new_context(locale="zh-CN", accept_downloads=True)page = context.new_page()url = f"https://login.microsoftonline.com/{OUTLOOK_TENANT_ID}/oauth2/v2.0/authorize?client_id={OUTLOOK_CLIENT_ID}&response_type=code&redirect_uri={OUTLOOK_REDIRECT_URI}&response_mode=query&scope=https%3A%2F%2Fgraph.microsoft.com%2Fmail.read&state=12345"page.goto(url)page.click("[placeholder=\"电子邮件、电话或\\ Skype\"]")page.fill("[placeholder=\"电子邮件、电话或\\ Skype\"]", _user_name)with page.expect_navigation():page.click("text=下一步")page.click("[placeholder=\"密码\"]")page.fill("[placeholder=\"密码\"]", _pass_word)with page.expect_navigation():page.click("text=登录")page.click("text=是")time.sleep(3)query = dict(urllib.parse.parse_qsl(urllib.parse.urlsplit(page.url).query))authorization_code = query.get('code')context.close()browser.close()return authorization_codedef get_token(authorization_code):param = {'client_id': OUTLOOK_CLIENT_ID,'code': authorization_code,'redirect_uri': OUTLOOK_REDIRECT_URI,'grant_type': 'authorization_code','client_secret': OUTLOOK_CLIENT_SECRET}token_headers = {"Content-Type": "application/x-www-form-urlencoded"}token_url = f'https://login.microsoftonline.com/{OUTLOOK_TENANT_ID}/oauth2/v2.0/token'res = requests.post(url=token_url, headers=token_headers, data=param)access_token = json.loads(res.text).get('access_token')return f'Bearer {access_token}'# api官方文档 https://learn.microsoft.com/en-us/graph/api/mailfolder-list-messages?view=graph-rest-1.0
def get_inbox(authorization, str_start, str_end):condition = '?$filter = 'if str_start:condition += f'ReceivedDateTime ge {str_start} and 'condition += f'receivedDateTime lt {str_end}'# 获取收件箱里面的 邮件endpoint = f"https://graph.microsoft.com/v1.0/me/mailFolders/inbox/messages{condition}&$top={OUTLOOK_TOP}"http_headers = {'Authorization': authorization,'Accept': 'application/json','Content-Type': 'application/json'}data = requests.get(endpoint, headers=http_headers, stream=False).json()return datadef get_email_attachments(authorization, email_id):# 获取收件箱里面的邮件附件endpoint = f"https://graph.microsoft.com/v1.0/me/messages/{email_id}/attachments"http_headers = {'Authorization': authorization,'Accept': 'application/json','Content-Type': 'application/json'}data = requests.get(endpoint, headers=http_headers, stream=False).json()return data#程序入口
def deal_user_email(_user_name, _pass_word, str_start, str_end) -> list:result = []redis_key = f'{OUTLOOK_TOKEN}:{_user_name}'# 缓存authorization = get_redis_value(redis_key)if authorization:passelse:authorization_code = get_authorization_code(_user_name, _pass_word)authorization = get_token(authorization_code)if authorization:set_redis_expire(redis_key, authorization, 60 * 60)if authorization:email = get_inbox(authorization, str_start, str_end)if email:email_values = email.get("value")if email_values:for value in email_values:# 邮件 idemail_id = value.get("id")# 是否存在附件 True/Falsehas_attachments = value.get("hasAttachments")value.update({"attachments": {}})if has_attachments and email_id:attachment_dict = upload_attachment_to_cos(authorization, email_id, _user_name)value.update({"attachments": attachment_dict})result.append(value)else:log.error(f"outlook user_name: {_user_name} Authorization Failed")return result'''
附件上传到cos
'''def upload_attachment_to_cos(authorization, email_id, _user_name):attachment_dict = {}attachments = get_email_attachments(authorization, email_id)if attachments:attachment_values = attachments.get("value")if attachment_values:for _value in attachment_values:# 附件 nameattachment_name = _value.get("name")# 附件 内容attachment_content = _value.get("contentBytes")# Step 1: 解码 Base64 字符串decoded_data = base64.b64decode(attachment_content)# Step 2: 创建一个 BytesIO 对象作为文件流file_stream = BytesIO(decoded_data)object_name = f'{COS_OUTLOOK_DIR}/{_user_name}/{email_id}/{attachment_name}'is_exists, url = check_exists(object_name)if not is_exists:url = upload_stream_to_cos(file_stream, object_name)attachment_dict.update({attachment_name: url})return attachment_dict
import traceback
from datetime import datetime, timedeltafrom django.core.management import BaseCommand
import logging
import uuidfrom django.db import transactionfrom pia.models import PiaOutLookTask, PiaOutLookData
from pia.utils.aes import decrypted
from pia.utils.outlook import deal_user_emaillogging = logging.getLogger('task')#任务的方式拉取
class Command(BaseCommand):def add_arguments(self, parser):parser.add_argument('trace_id', type=str)def handle(self, *args, **options):trace_id = options["trace_id"]if not trace_id:trace_id = str(uuid.uuid4())self.trace_id = trace_idself.writeLog('outlook email start')outlook_task = PiaOutLookTask.objects.values("id", "user_name", "pwd", "execution_time")for x in outlook_task:_id = x.get("id")user_name = x.get("user_name")self.writeLog(f'############## outlook email user_name:{user_name} start ##############')pwd = x.get("pwd")execution_time = x.get("execution_time")# 获取当前时间current_time = datetime.now()# 格式化为 YYYY-MM-DDend_time = current_time.strftime('%Y-%m-%dT%H:%M:%S')try:if user_name and pwd:_pwd = decrypted(pwd)result = deal_user_email(user_name, _pwd, f'{execution_time}Z', F'{end_time}Z')with transaction.atomic():PiaOutLookTask.objects.filter(id=_id).update(status=0, execution_time=end_time)outlook_data = PiaOutLookData.objects.filter(outlook_task_id=_id, start_time=execution_time,end_time=end_time)if outlook_data:outlook_data.update(content=result)else:PiaOutLookData.objects.create(outlook_task_id=_id, start_time=execution_time,end_time=end_time, content=result)self.writeLog(f'############## outlook email user_name:{user_name} end ##############')except Exception:PiaOutLookTask.objects.filter(id=_id).update(status=1, execution_time=end_time)self.writeLog(f'############## outlook email user_name:{user_name} execution failed::::{traceback.format_exc()}')self.writeLog('outlook email end')def writeLog(self, msg: str):logging.info(f'[{self.trace_id}] {msg}')