400行python 实现http/https 代理服务器,400行python,#!/usr/bin/p


#!/usr/bin/python#-*- coding:utf-8 -*-import socket, loggingimport select, errnoimport osimport sysimport tracebackimport gzipfrom StringIO import StringIOimport Queueimport threadingimport timeimport threadimport cgifrom cgi import parse_qsimport jsonimport impfrom os.path import join, getsizeimport reimport ssl##################user config ##################logger = logging.getLogger("network-server")#############################################def getTraceStackMsg():    tb = sys.exc_info()[2]    msg = ''    for i in traceback.format_tb(tb):        msg += i    return msgdef InitLog():    logger.setLevel(logging.DEBUG)    fh = logging.FileHandler("network-server.log")    fh.setLevel(logging.DEBUG)    ch = logging.StreamHandler()    ch.setLevel(logging.ERROR)    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")    ch.setFormatter(formatter)    fh.setFormatter(formatter)    logger.addHandler(fh)    logger.addHandler(ch)def clearfdpro(epoll_fd, params, fd):    try:        fd_check = int(fd)    except Exception, e:        print "fd error"        sys.exit(1)    try:        #print "pid:%s, close fd:%s" % (os.getpid(), fd)        epoll_fd.unregister(fd)    except Exception, e:        #print str(e)+getTraceStackMsg()        pass    try:        param = params[fd]        try:            addr = param["addr"]            if "next" in param:                print "close sock, %s:%s" % (addr[0], addr[1])        except Exception, e:            pass        param["connections"].shutdown(socket.SHUT_RDWR)        param["connections"].close()        f = param.get("f", None)        if f != None:            f.close()        rc = param.get("rc", None)        if rc != None:            rc.close()        if "read_cache_name" in param:            os.remove(param["read_cache_name"])    except Exception, e:        #print str(e)+getTraceStackMsg()        pass    try:        del params[fd]        #logger.error(getTraceStackMsg())        #logger.error("clear fd:%s" % fd)    except Exception, e:        #print str(e)+getTraceStackMsg()        passdef clearfd(epoll_fd, params, fd):    try:        param = params[fd]        if "nextfd" in param:            nextfd = param["nextfd"]            next_param = params[nextfd]            del param["nextfd"]            del next_param["nextfd"]            if not "next" in param: #masterfd                clearfdpro(epoll_fd, params, nextfd)            else: # nextfd                if not "writedata" in next_param or len(next_param["writedata"]) == 0:                    clearfdpro(epoll_fd, params, nextfd)                else:                    next_param["sendandclose"] = "true"        clearfdpro(epoll_fd, params, fd)    except Exception, e:        #print str(e)+getTraceStackMsg()        passdef FindHostPort(datas):    host_s = -1    host_e = -1    host_str = None    host = ""    port = ""    if not datas.startswith("CONNECT"):        host_s = datas.find("Host:")        if host_s < 0:            host_s = datas.find("host:")        if host_s > 0:            host_e = datas.find("\r\n", host_s)        if host_s > 0 and host_e > 0:            host_str = datas[host_s+5:host_e].strip()            add_list = host_str.split(":")            if len(add_list) == 2:                host = add_list[0]                port = add_list[1]            else:                host = add_list[0]                port = 80            first_seg = datas.find("\r\n")            first_line = datas[0:first_seg]            first_line = first_line.replace(" http://%s" % host_str, " ")            datas = first_line + datas[first_seg:]    else:        first_seg = datas.find("\r\n")        head_e = datas.find("\r\n\r\n")        if first_seg > 0 and head_e > 0:            first_line = datas[0:first_seg]            com,host_str,http_version = re.split('\s+', first_line)            add_list = host_str.split(":")            if len(add_list) == 2:                host = add_list[0]                port = add_list[1]            else:                host = add_list[0]                port = 443            host_s = 1            host_e = 1    return host_str,host_s,host_e,host,port,datasdef connect_pro(params, param, epoll_fd, datas, fd, cur_time, host, port):    try:        nextfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)        nextfd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)        nextfd.settimeout(5)        try:            nextfd.connect((host, int(port)))        except Exception, e:            print "########%s,%s connect fail" % (host,port)        nextfd.setblocking(0)        next_fileno = nextfd.fileno()        print "pid:%s, connect %s:%s fd:%s" % (os.getpid(), host, port, next_fileno)        if next_fileno in params:            print "fileno exist"            sys.exit(1)        if not datas.startswith("CONNECT"):            next_param = {"addr":[host,port],"writelen":0, "connections":nextfd, "time":cur_time, "nextfd":fd}            param["nextfd"] = next_fileno            next_param["writedata"] = datas            next_param["writelen"] = 0            next_param["next"] = "true"            param["read_len"] = 0            param["readdata"] = ""            params[next_fileno] = next_param            epoll_fd.register(next_fileno, select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP)            epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP)        else:            next_param = {"addr":[host,port],"writelen":0, "connections":nextfd, "time":cur_time, "nextfd":fd}            param["nextfd"] = next_fileno            next_param["next"] = "true"            params[next_fileno] = next_param            epoll_fd.register(next_fileno, select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP)            param["read_len"] = 0            param["readdata"] = ""            param["writedata"] = "HTTP/1.1 200 Connection Established\r\nConnection: close\r\n\r\n"            param["writelen"] = 0            param["reuse"] = "true"            epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP)    except socket.error, msg:        clearfd(epoll_fd,params,fd)def process_datas(process_status, params, param, epoll_fd, datas, read_len, fd, cur_time):    if process_status == "close":        clearfd(epoll_fd,params,fd)    else:        need_connect = False        host_str = None        host_s = -1        host_e = -1        if "reuse" in param and "next" not in param:            if not datas.startswith("CONNECT") and \                    not datas.startswith("GET") and \                    not datas.startswith("POST") and \                    not datas.startswith("PUT"):                del param["reuse"]            else:                host_str,host_s,host_e,host,port,datas = FindHostPort(datas)                host_s = int(host_s)                host_e = int(host_e)                next_fileno = param["nextfd"]                next_param = params[next_fileno]                addr = next_param["addr"]                if host_s > 0 and host_e > 0:                    if host != addr[0] or str(port) != str(addr[1]):                        print "%s,%s neq %s,%s" % (host,port,addr[0],addr[1])                        need_connect = True                        del param["nextfd"]                        del next_param["nextfd"]                        clearfd(epoll_fd,params,next_fileno)                    del param["reuse"]                else:                    param["read_len"] = read_len                    param["readdata"] = datas                    return None        if need_connect or not "nextfd" in param:            if host_str == None or not host_s > 0 or not host_e > 0:                host_str,host_s,host_e,host,port,datas = FindHostPort(datas)                host_s = int(host_s)                host_e = int(host_e)            if host_s > 0 and host_e > 0:                if not datas.startswith("CONNECT"):                    epoll_fd.modify(fd, select.EPOLLERR | select.EPOLLHUP) # 简单处理,http连接时把读去掉,避免内存攻击                thread.start_new_thread(connect_pro,(params, param, epoll_fd, datas, fd, cur_time, host, port))            else:                param["read_len"] = read_len                param["readdata"] = datas        else:            next_fileno = param["nextfd"]            next_param = params[next_fileno]            if "next" in param:                next_param["reuse"] = "true"            write_data = next_param.get("writedata", "")            write_data += datas            next_param["writedata"] = write_data            param["read_len"] = 0            param["readdata"] = ""            epoll_fd.modify(next_fileno, select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP)        if process_status == "close_after_process":            print "close after process"            clearfd(epoll_fd,params,fd)def run_main(listen_fd):    try:        epoll_fd = select.epoll()        epoll_fd.register(listen_fd.fileno(), select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP)        print "listen_fd:%s" % listen_fd.fileno()    except select.error, msg:        logger.error(msg)    params = {}    last_min_time = -1    while True:        epoll_list = epoll_fd.poll()        cur_time = time.time()        for fd, events in epoll_list:            if fd == listen_fd.fileno():                while True:                    try:                        conn, addr = listen_fd.accept()                        conn.setblocking(0)                        epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP)                        conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)                        #conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)                        params[conn.fileno()] = {"addr":addr,"writelen":0, "connections":conn, "time":cur_time}                    except socket.error, msg:                        break            elif select.EPOLLIN &amp; events:                param = params.get(fd,None)                if param == None:                    continue                param["time"] = cur_time                datas = param.get("readdata","")                cur_sock = params[fd]["connections"]                read_len = param.get("read_len", 0)                process_status = "close"                while True:                    try:                        data = cur_sock.recv(102400)                        if not data:                            if datas == "":                                break                            else:                                raise Exception("close after process")                        else:                            datas += data                            read_len += len(data)                    except socket.error, msg:                        if msg.errno == errno.EAGAIN:                            process_status = "process"                            break                        else:                            break                    except Exception, e:                        process_status = "close_after_process"                        break                process_datas(process_status, params, param, epoll_fd, datas, read_len, fd, cur_time)            elif select.EPOLLHUP &amp; events or select.EPOLLERR &amp; events:                clearfd(epoll_fd,params,fd)                logger.error("sock: %s error" % fd)            elif select.EPOLLOUT &amp; events:                param = params.get(fd,None)                if param == None:                    continue                param["time"] = cur_time                sendLen = param.get("writelen",0)                writedata = param.get("writedata", "")                total_write_len = len(writedata)                cur_sock = param["connections"]                f = param.get("f", None)                totalsenlen = param.get("totalsenlen", None)                if writedata == "":                    clearfd(epoll_fd,params,fd)                    continue                while True:                    try:                        sendLen += cur_sock.send(writedata[sendLen:])                        if sendLen == total_write_len:                            if f != None and totalsenlen != None:                                readmorelen = 102400                                if readmorelen > totalsenlen:                                    readmorelen = totalsenlen                                morefiledata = ""                                if readmorelen > 0:                                    morefiledata = f.read(readmorelen)                                if morefiledata != "":                                    writedata = morefiledata                                    sendLen = 0                                    total_write_len = len(writedata)                                    totalsenlen -= total_write_len                                    param["writedata"] = writedata                                    param["totalsenlen"] = totalsenlen                                    continue                                else:                                    f.close()                                    del param["f"]                                    del param["totalsenlen"]                            if not "sendandclose" in param:                                param["writedata"] = ""                                param["writelen"] = 0                                epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP)                            else:                                clearfd(epoll_fd,params,fd)                            break                    except socket.error, msg:                        if msg.errno == errno.EAGAIN:                            param["writelen"] = sendLen                            break                        clearfd(epoll_fd,params,fd)                        break            else:                continue        #check time out        if cur_time - last_min_time > 20:            last_min_time = cur_time            objs = params.items()            for (key_fd,value) in objs:                fd_time = value.get("time", 0)                del_time = cur_time - fd_time                if del_time > 20:                    clearfd(epoll_fd,params,key_fd)                elif fd_time < last_min_time:                    last_min_time = fd_timeif __name__ == "__main__":    reload(sys)    sys.setdefaultencoding('utf8')    InitLog()    port = int(sys.argv[1])    try:        listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)    except socket.error, msg:        logger.error("create socket failed")    try:        listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)    except socket.error, msg:        logger.error("setsocketopt SO_REUSEADDR failed")    try:        listen_fd.bind(('', port))    except socket.error, msg:        logger.error("bind failed")    try:        listen_fd.listen(10240)        listen_fd.setblocking(0)    except socket.error, msg:        logger.error(msg)    child_num = 19    c = 0    while c < child_num:        c = c + 1        newpid = os.fork()        if newpid == 0:            run_main(listen_fd)    run_main(listen_fd)

评论关闭