54 lines
		
	
	
		
			1.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			54 lines
		
	
	
		
			1.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from dingtalkchatbot.chatbot import DingtalkChatbot
 | |
| from slack_bolt.async_app import AsyncApp
 | |
| from slack_bolt.authorization import AuthorizeResult
 | |
| from slack_sdk.web.async_client import AsyncWebClient
 | |
| import os
 | |
| import logging
 | |
| logging.basicConfig(level=logging.DEBUG)
 | |
| 
 | |
| # Import the async app instead of the regular one
 | |
| 
 | |
| 
 | |
| async def authorize(enterprise_id, team_id, user_id, client: AsyncWebClient, logger):
 | |
|     logger.info(f"{enterprise_id},{team_id},{user_id}")
 | |
|     # You can implement your own logic here
 | |
|     token = os.environ["SLACK_BOT_TOKEN"]
 | |
|     return AuthorizeResult.from_auth_test_response(
 | |
|         auth_test_response=await client.auth_test(token=token),
 | |
|         bot_token=token,
 | |
|     )
 | |
| 
 | |
| # WebHook地址
 | |
| dingtalk_webhook = os.environ["DINGTALK_WEBHOOK"]
 | |
| 
 | |
| dingtalkbot = DingtalkChatbot(dingtalk_webhook)
 | |
| app = AsyncApp(
 | |
|     signing_secret=os.environ["SLACK_SIGNING_SECRET"], authorize=authorize)
 | |
| 
 | |
| 
 | |
| @app.event("app_mention")
 | |
| async def event_test(body, say, logger):
 | |
|     logger.info(body)
 | |
|     await say("What's up?")
 | |
| 
 | |
| 
 | |
| @app.command("/hello-bolt-python")
 | |
| async def command(ack, body, respond):
 | |
|     await ack()
 | |
|     await respond(f"Hi <@{body['user_id']}>!")
 | |
| 
 | |
| 
 | |
| @app.message("Heads Up!*")
 | |
| async def redirect(body, logger):
 | |
|     logger.info(body)
 | |
| 
 | |
| 
 | |
| @app.message("")
 | |
| async def logall(body, logger):
 | |
|     logger.debug(body)
 | |
| 
 | |
| 
 | |
| # 启动slack监听
 | |
| if __name__ == "__main__":
 | |
|     app.start(3000)
 |