0xGA: Artifact Content

Yet another PHP framework, but made for org-mode and geeks.

Artifact 140375a8023a15a5eeb9bf383e8297b2bb9e25b1:


#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import io
import re
import pwd
import grp
import sys
import signal
import urllib
import shutil
import fnmatch
import logging
import threading
import configparser
from datetime import date
from http.server import HTTPServer
from http.server import BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
from configparser import ConfigParser



class ResourceHelper:
    """ Utility to extract and expose various information on resources served """

    content_types = {
        'html': 'text/html; charset=utf-8',
        'jpg': 'image/jpeg',
        'jpeg': 'image/jpeg',
        'png': 'image/png',
        'css': 'text/css; charset=utf-8',
        'otf': 'application/font-sfnt',
        'woff': 'application/font-woff',
        'atom': 'application/atom+xml; charset=utf-8',
        'json': 'application/json; charset=utf-8',
        'js': 'text/javascript; charset=utf-8',
        'py': 'text/plain',
        'gpg': 'text/plain',
        'el': 'text/plain',
        'txt': 'text/plain',
        'org': 'text/plain; charset=utf-8',
        'pdf': 'application/pdf'
    }

    def __init__(self, request, App):

        self.routes = []
        self.App = App
        self.appname = App.appname

        routes = ConfigParser()
        routes.read('etc/routes.conf')
        if self.App.current_domain in routes:
            self.routes = routes[self.App.current_domain]

        self.request_path = urllib.parse.unquote(request)
        self.resource_path = None
        self.resource_ext = None
        self.resource_is_method = False
        self.content = None
        self.content_type = None


    def guess_content_type(self):

        if self.resource_path == None:
            return

        if self.resource_is_method and self.App.extension != None:
            ext = self.App.extension
        else:
            root, ext = os.path.splitext(self.resource_path)
            ext = ext[1:]

        if ext != '' and ext in self.content_types:

            self.resource_ext = ext
            self.content_type = self.content_types[ext]

            logging.debug('[{0}] has extension {1} and content_type {2}'.format(
                self.resource_path, self.resource_ext, self.content_type))


    def test_local_file_exists(self, request):

        logging.debug("[{0}] Test if local file {0} exists.".format(request))

        if os.path.isfile(request):
            logging.debug("[{0}] Match.".format(request))
            self.resource_path = request
            return True

        return False


    def test_method_exists(self, request):

        logging.debug("[{0}] Test if method {0} exists in your App".format(request))
        if request in dir(self.App):
            logging.debug("[{0}] Match".format(request))
            self.resource_path = request
            self.resource_is_method = True
            return True


    def extract_interesting_routes(self):
        interesting_routes = []
        tokens = False

        for potential_path in self.routes:
            if fnmatch.fnmatch(self.request_path, potential_path):
                logging.debug("[{0}] Found potential match with {1}".format(
                    self.request_path, potential_path
                ))

                expand_wildcards = re.sub(r'\*', '(.+)', potential_path)
                tokens = re.search(
                    expand_wildcards,
                    self.request_path
                )

                if tokens:
                    interesting_routes.append(
                        self.routes[potential_path].format(*tokens.groups()))

        return interesting_routes


    def find_route(self):

        loc_path_test = 'srv/{0}{1}'.format(self.appname, self.request_path)
        if self.test_local_file_exists(loc_path_test):
            return True

        logging.debug("[{0}] Try to find a candidate in the user defined routes".format(self.request_path))

        for route in self.extract_interesting_routes():
            if self.test_method_exists(route):
                return True

            route = 'srv/{0}/{1}'.format(self.appname, route)
            if self.test_local_file_exists(route):
                return True

        return False


    def preprocess_content(self):
        success = True
        if self.resource_is_method:
            logging.debug("[{0}] Spawn custom content method".format(self.resource_path))
            self.content = getattr(self.App, self.resource_path)()

            if self.content == None:
                success = False

        self.guess_content_type()
        return success


    def send_content(self):
        if self.resource_is_method:
            self.App.wfile.write(self.content)

        else:
            with open(self.resource_path, 'rb') as f:
                shutil.copyfileobj(f, self.App.wfile)



class IcanDoThat(BaseHTTPRequestHandler):
    """ Main HTTP handler """
    server_version = '0xGA/0.1'
    appname = 'DEFAULT'

    def __init__(self, request, client_address, server):
        self.error = False
        self.extension = None
        self.current_domain = server.server_name
        self.current_port = server.server_port

        BaseHTTPRequestHandler.__init__(self, request, client_address, server)


    def load_config(self):
        host_infos = self.headers.get('host','').split(':')
        self.current_domain = host_infos[0]
        logging.debug('Request for {0} processed by {1}'.format(
            self.current_domain,
            threading.currentThread().getName()))

        config = ConfigParser()
        config.read('etc/rc.conf')

        if not self.current_domain in config:
            self.error = 'Domain is not reachable in configuration file'

        else:
            self.config = config[self.current_domain]
            logging.debug('Request is for App {0}'.format(self.appname))

            if self.server.should_interrupt:
                logging.info('Processing request after SIGUSR1')
                self.server.should_interrupt = False
                self.do_interrupt()


    def log_message(self, message, *args):
        logging.info(message % args)


    def do_interrupt(self):
        pass


    def do_error_page(self, errno, reason):
        self.build_headers(int(errno), 'text/html; charset=utf-8')

        if errno in self.config:
            with open(self.config[errno], 'rb') as f:
                shutil.copyfileobj(f, self.wfile)

        else:
            error_html = """<!DOCTYPE html>
<html>
<head>
  <title>{0}</title>
  <style type="text/css">
    body {{
      background-color: #d8d8d8;
      font-family: sans-serif;
    }}
    #content {{
      background-color: #ffffff;
      border: 1px solid #adadad;
      border-radius: 10px;
      width: 600px;
      margin: 20px auto;
      padding: 10px 20px;
    }}
  </style>
</head>
<body>
<div id="content">
<h1>Error {0}</h1>
<p>{1}</p>
</div>
</body>
</html>""".format(errno, reason)
            self.wfile.write(bytes(error_html, 'utf-8'))


    def do_404(self, reason='File not found'):
        logging.error('[Error 404] {0}'.format(reason))
        self.do_error_page('404', reason)


    def do_501(self, reason='Lack of configuration'):
        logging.critical('[Error 501] {0}'.format(reason))
        self.do_error_page('501', reason)


    def build_headers(self, resno, content_type = 'text/plain'):
        self.send_response(resno)
        if content_type != None:
            self.send_header('Content-Type', content_type)
        self.end_headers()


    def do_PUT(self):
        self.send_response(403)
        self.end_headers()


    def do_HEAD(self):
        self.load_config()

        if not self.error:
            rh = ResourceHelper(self.path, self)
            if rh.find_route() and rh.preprocess_content():
                self.send_response(200)
            else:
                self.send_response(404)

        else:
            self.send_response(501)

        self.end_headers()


    def do_GET(self):
        self.load_config()

        if not self.error:
            rh = ResourceHelper(self.path, self)

            if rh.find_route() and rh.preprocess_content():
                self.build_headers(200, rh.content_type)
                rh.send_content()

            else:
                self.do_404()

        else:
            self.do_501(self.error)



class NarvThreadedServer(ThreadingMixIn, HTTPServer):
    """Main threaded server"""
    should_interrupt = False


class Narv:
    def __init__(self, server_infos=('', 8000), request_handler=IcanDoThat):

        config = ConfigParser()
        config.read('etc/rc.conf')

        log_level = 'info'
        drop_priviledges = True
        narv_root_path = os.path.normpath(os.path.join(
            os.path.dirname(os.path.abspath(__file__)),
            '../../'))

        if 'general' in config:
            if 'debug' in config['general']:
                log_level = config['general']['debug']

            if 'drop_priviledges' in config['general']:
                drop_priviledges = config['general'].getboolean('drop_priviledges')

        log_file = '{0}.{1}.log'.format(
            request_handler.appname,
            date.today().strftime("%Y%m%d")
        )
        logging.basicConfig(
            format='%(asctime)s -- [%(levelname)s] %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S',
            filename=os.path.join(narv_root_path, 'var/log/', log_file),
            level=getattr(logging, log_level.upper()))

        signal.signal(signal.SIGINT, self.shutdown)
        signal.signal(signal.SIGTERM, self.shutdown)
        signal.signal(signal.SIGUSR1, self.interrupt)

        self.server = NarvThreadedServer(server_infos, request_handler)
        server_thread = threading.Thread(target=self.server.serve_forever)
        server_thread.daemon = True
        server_thread.start()

        if os.getuid() == 0:
            os.chroot(narv_root_path)
            os.chdir("/")
            logging.debug("Dropping priviledges to UID {0} GID {1}".format('nobody', 'nobody'))

            # Remove group privileges
            os.setgroups([])

            # Try setting the new uid/gid
            os.setgid(grp.getgrnam('nobody').gr_gid)
            os.setuid(pwd.getpwnam('nobody').pw_uid)

            # Ensure a very conservative umask
            os.umask(int('077', 8))
        else:
            os.chdir(narv_root_path)


    def interrupt(self, signal, frame):
        logging.info('SIGUSR1 caught. Waiting for next request...')
        self.server.should_interrupt = True


    def start(self):
        logging.info("Serving {0} at port {1}".format(self.server.server_name, self.server.server_port))
        self.server.serve_forever()


    def shutdown(self, signal, frame):
        logging.info("Kthxbye")
        self.server.shutdown()
        sys.exit(0)



if __name__ == "__main__":
    app = Narv()
    app.start()