experimental mail transport
This commit is contained in:
parent
e43d4837c3
commit
d52575aa42
12 changed files with 271 additions and 85 deletions
152
core/server.py
152
core/server.py
|
@ -1,55 +1,30 @@
|
|||
#!/usr/bin/env python3
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
|
||||
import uvicorn
|
||||
from aiosmtpd.controller import Controller, UnixSocketController
|
||||
from aiosmtpd.lmtp import LMTP
|
||||
|
||||
from helper import init_loop
|
||||
from lmtp.protocol import LMTPHandler
|
||||
from lmtp.socket import UnixSocketLMTPController
|
||||
|
||||
|
||||
async def handle_echo(reader, writer):
|
||||
# session = Session()
|
||||
data = await reader.read(100)
|
||||
message = data.decode()
|
||||
addr = writer.get_extra_info('peername')
|
||||
|
||||
print(f"Received {message!r} from {addr!r}")
|
||||
|
||||
print(f"Send: {message!r}")
|
||||
writer.write(data)
|
||||
await writer.drain()
|
||||
|
||||
print("Close the connection")
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
|
||||
|
||||
class ExampleHandler:
|
||||
async def handle_RCPT(self, server, session, envelope, address, rcpt_options):
|
||||
if not address.endswith('@example.com'):
|
||||
return '550 not relaying to that domain'
|
||||
envelope.rcpt_tos.append(address)
|
||||
return '250 OK'
|
||||
|
||||
async def handle_DATA(self, server, session, envelope):
|
||||
print('Message from %s' % envelope.mail_from)
|
||||
print('Message for %s' % envelope.rcpt_tos)
|
||||
print('Message data:\n')
|
||||
for ln in envelope.content.decode('utf8', errors='replace').splitlines():
|
||||
print(f'> {ln}'.strip())
|
||||
print()
|
||||
print('End of message')
|
||||
return '250 Message accepted for delivery'
|
||||
|
||||
|
||||
class LTMPController(Controller):
|
||||
def factory(self):
|
||||
return LMTP(self.handler)
|
||||
|
||||
|
||||
class UnixSocketLMTPController(UnixSocketController):
|
||||
def factory(self):
|
||||
return LMTP(self.handler)
|
||||
# async def handle_echo(reader, writer):
|
||||
# # session = Session()
|
||||
# data = await reader.read(100)
|
||||
# message = data.decode()
|
||||
# addr = writer.get_extra_info('peername')
|
||||
#
|
||||
# print(f"Received {message!r} from {addr!r}")
|
||||
#
|
||||
# print(f"Send: {message!r}")
|
||||
# writer.write(data)
|
||||
# await writer.drain()
|
||||
#
|
||||
# print("Close the connection")
|
||||
# writer.close()
|
||||
# await writer.wait_closed()
|
||||
|
||||
|
||||
class UvicornServer(uvicorn.Server):
|
||||
|
@ -57,7 +32,7 @@ class UvicornServer(uvicorn.Server):
|
|||
pass
|
||||
|
||||
|
||||
async def web():
|
||||
async def web(loop):
|
||||
log_config = uvicorn.config.LOGGING_CONFIG
|
||||
log_config["handlers"]["default"] = {"class": "logging.FileHandler", "filename": "web.log", "formatter": "default"}
|
||||
log_config["handlers"]["access"] = {"class": "logging.FileHandler", "filename": "web-access.log",
|
||||
|
@ -67,55 +42,59 @@ async def web():
|
|||
await server.serve()
|
||||
|
||||
|
||||
async def tcp():
|
||||
log = logging.getLogger('test')
|
||||
log.info("Starting TCP server")
|
||||
server = await asyncio.start_unix_server(handle_echo, path='test.sock')
|
||||
# async def tcp(loop):
|
||||
# log = logging.getLogger('test.log')
|
||||
# log.addHandler(logging.FileHandler('test.log'))
|
||||
# log.setLevel(logging.DEBUG)
|
||||
# log.info("Starting TCP server")
|
||||
# server = await asyncio.start_unix_server(handle_echo, path='test.sock')
|
||||
#
|
||||
# addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
|
||||
# log.info(f'Serving on {addrs}')
|
||||
#
|
||||
# async with server:
|
||||
# await server.serve_forever()
|
||||
# log.info("TCP done")
|
||||
|
||||
|
||||
async def lmtp(loop):
|
||||
import grp
|
||||
log = logging.getLogger('mail.log')
|
||||
log.addHandler(logging.FileHandler('mail.log'))
|
||||
# log.setLevel(logging.WARNING)
|
||||
log.setLevel(logging.INFO)
|
||||
log.info("Starting LMTP server")
|
||||
server = await UnixSocketLMTPController(LMTPHandler(), unix_socket='lmtp.sock', loop=loop).serve()
|
||||
|
||||
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
|
||||
log.info(f'Serving on {addrs}')
|
||||
|
||||
try:
|
||||
os.chmod('lmtp.sock', 0o775)
|
||||
current_uid = os.getuid()
|
||||
posix_gid = grp.getgrnam('postfix').gr_gid
|
||||
os.chown('lmtp.sock', current_uid, posix_gid)
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
|
||||
async with server:
|
||||
await server.serve_forever()
|
||||
log.info("TCP done")
|
||||
|
||||
|
||||
async def lmtp():
|
||||
log = logging.getLogger('lmtp')
|
||||
log.info("Starting LMTP server")
|
||||
cont = UnixSocketLMTPController(ExampleHandler(), unix_socket='lmtp.sock')
|
||||
cont.start()
|
||||
log.info("LMTP done")
|
||||
|
||||
|
||||
async def shutdown(sig, loop):
|
||||
log = logging.getLogger()
|
||||
log.info(f"Received exit signal {sig.name}...")
|
||||
tasks = [t for t in asyncio.all_tasks() if t is not
|
||||
asyncio.current_task()]
|
||||
[task.cancel() for task in tasks]
|
||||
log.info(f"Cancelling {len(tasks)} outstanding tasks")
|
||||
await asyncio.wait_for(loop.shutdown_asyncgens(), timeout=10)
|
||||
loop.stop()
|
||||
log.info("Shutdown complete.")
|
||||
|
||||
|
||||
def main():
|
||||
import sdnotify
|
||||
import signal
|
||||
import setproctitle
|
||||
import os
|
||||
setproctitle.setproctitle("c3lf-sys3")
|
||||
logging.basicConfig(filename='server.log', level=logging.DEBUG, encoding='utf-8')
|
||||
logging.basicConfig(filename='test.log', level=logging.DEBUG, encoding='utf-8')
|
||||
logging.basicConfig(filename='lmtp.log', level=logging.DEBUG, encoding='utf-8')
|
||||
log = logging.getLogger()
|
||||
log = logging.getLogger('server.log')
|
||||
log.addHandler(logging.FileHandler('server.log'))
|
||||
log.setLevel(logging.DEBUG)
|
||||
log.info("Starting server")
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.create_task(shutdown(signal.SIGTERM, loop)))
|
||||
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(shutdown(signal.SIGINT, loop)))
|
||||
loop.create_task(web())
|
||||
loop.create_task(tcp())
|
||||
loop.create_task(lmtp())
|
||||
loop = init_loop()
|
||||
loop.create_task(web(loop))
|
||||
# loop.create_task(tcp(loop))
|
||||
loop.create_task(lmtp(loop))
|
||||
n = sdnotify.SystemdNotifier()
|
||||
n.notify("READY=1")
|
||||
log.info("Server ready")
|
||||
|
@ -123,8 +102,19 @@ def main():
|
|||
loop.run_forever()
|
||||
finally:
|
||||
loop.close()
|
||||
try:
|
||||
os.remove("lmtp.sock")
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
try:
|
||||
os.remove("web.sock")
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
log.error(e)
|
||||
logging.info("Server stopped")
|
||||
|
||||
logging.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue