from dingtalkchatbot.chatbot import DingtalkChatbot from slack_bolt.async_app import AsyncApp from slack_bolt.authorization import AuthorizeResult from slack_sdk.web.async_client import AsyncWebClient import os import logging logging.basicConfig(level=logging.DEBUG) # Import the async app instead of the regular one async def authorize(enterprise_id, team_id, user_id, client: AsyncWebClient, logger): logger.info(f"{enterprise_id},{team_id},{user_id}") # You can implement your own logic here token = os.environ["SLACK_BOT_TOKEN"] return AuthorizeResult.from_auth_test_response( auth_test_response=await client.auth_test(token=token), bot_token=token, ) # WebHook地址 dingtalk_webhook = os.environ["DINGTALK_WEBHOOK"] dingtalkbot = DingtalkChatbot(dingtalk_webhook) app = AsyncApp( signing_secret=os.environ["SLACK_SIGNING_SECRET"], authorize=authorize) @app.event("app_mention") async def event_test(body, say, logger): logger.info(body) await say("What's up?") @app.command("/hello-bolt-python") async def command(ack, body, respond): await ack() await respond(f"Hi <@{body['user_id']}>!") @app.message("Heads Up!*") async def redirect(body, logger): logger.info(body) @app.message("") async def logall(body, logger): logger.debug(body) dingtalkbot.send_text(body) # 启动slack监听 if __name__ == "__main__": app.start(3000)