python-websocketd
Python module for creating a http server which uses WebSockets.
websocketd.py
Go to the documentation of this file.
1 # Python module for serving WebSockets and web pages.
2 # vim: set fileencoding=utf-8 foldmethod=marker :
3 
4 # {{{ Copyright 2013-2016 Bas Wijnen <wijnen@debian.org>
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or(at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # }}}
18 
19 # Documentation. {{{
20 
36 
37 '''@file
38 This module can be used to create websockets servers and clients. A websocket
39 client is an HTTP connection which uses the headers to initiate a protocol
40 change. The server is a web server which serves web pages, and also responds
41 to the protocol change headers that clients can use to set up a websocket.
42 
43 Note that the server is not optimized for high traffic. If you need that, use
44 something like Apache to handle all the other content and set up a virtual
45 proxy to this server just for the websocket.
46 
47 In addition to implementing the protocol, this module contains a simple system
48 to use websockets for making remote procedure calls (RPC). This system allows
49 the called procedures to be generators, so they can yield control to the main
50 program and continue running when they need to. This system can also be used
51 locally by using call().
52 '''
53 
54 '''@package websocketd Client WebSockets and webserver with WebSockets support
55 This module can be used to create websockets servers and clients. A websocket
56 client is an HTTP connection which uses the headers to initiate a protocol
57 change. The server is a web server which serves web pages, and also responds
58 to the protocol change headers that clients can use to set up a websocket.
59 
60 Note that the server is not optimized for high traffic. If you need that, use
61 something like Apache to handle all the other content and set up a virtual
62 proxy to this server just for the websocket.
63 
64 In addition to implementing the protocol, this module contains a simple system
65 to use websockets for making remote procedure calls (RPC). This system allows
66 the called procedures to be generators, so they can yield control to the main
67 program and continue running when they need to. This system can also be used
68 locally by using call().
69 '''
70 # }}}
71 
72 # See the example server for how to use this module.
73 
74 # imports. {{{
75 import network
76 from network import endloop, log, set_log_output, add_read, add_write, add_timeout, add_idle, remove_read, remove_write, remove_timeout, remove_idle
77 import os
78 import re
79 import sys
80 import base64
81 import hashlib
82 import struct
83 import json
84 import collections
85 import tempfile
86 import time
87 import traceback
88 from urllib.parse import urlparse, parse_qs, unquote
89 from http.client import responses as httpcodes
90 # }}}
91 
92 #def tracer(a, b, c):
93 # print('trace: %s:%d:\t%s' % (a.f_code.co_filename, a.f_code.co_firstlineno, a.f_code.co_name))
94 #
95 #sys.settrace(tracer)
96 
97 
104 DEBUG = 0 if os.getenv('NODEBUG') else int(os.getenv('DEBUG', 1))
105 
106 
108 class Websocket:
109 
145  def __init__(self, port, url = '/', recv = None, method = 'GET', user = None, password = None, extra = {}, socket = None, mask = (None, True), websockets = None, data = None, real_remote = None, *a, **ka):
146  self.recvrecv = recv
147  self.maskmask = mask
148  self.websocketswebsockets = websockets
149  self.websocket_bufferwebsocket_buffer = b''
150  self.websocket_fragmentswebsocket_fragments = b''
151  self.opcodeopcode = None
152  self._is_closed_is_closed = False
153  self._pong_pong = True # If false, we're waiting for a pong.
154  if socket is None:
155  socket = network.Socket(port, *a, **ka)
156  self.socketsocket = socket
157  # Use real_remote if it was provided.
158  if real_remote:
159  if isinstance(socket.remote, (tuple, list)):
160  self.remoteremote = [real_remote, socket.remote[1]]
161  else:
162  self.remoteremote = [real_remote, None]
163  else:
164  self.remoteremote = socket.remote
165  hdrdata = b''
166  if url is not None:
167  elist = []
168  for e in extra:
169  elist.append('%s: %s\r\n' % (e, extra[e]))
170  if user is not None:
171  userpwd = user + ':' + password + '\r\n'
172  else:
173  userpwd = ''
174  socket.send(('''\
175 %s %s HTTP/1.1\r
176 Connection: Upgrade\r
177 Upgrade: websocket\r
178 Sec-WebSocket-Key: 0\r
179 %s%s\r
180 ''' % (method, url, userpwd, ''.join(elist))).encode('utf-8'))
181  while b'\n' not in hdrdata:
182  r = socket.recv()
183  if r == b'':
184  raise EOFError('EOF while reading reply')
185  hdrdata += r
186  pos = hdrdata.index(b'\n')
187  assert int(hdrdata[:pos].split()[1]) == 101
188  hdrdata = hdrdata[pos + 1:]
189  data = {}
190  while True:
191  while b'\n' not in hdrdata:
192  r = socket.recv()
193  if len(r) == 0:
194  raise EOFError('EOF while reading reply')
195  hdrdata += r
196  pos = hdrdata.index(b'\n')
197  line = hdrdata[:pos].strip()
198  hdrdata = hdrdata[pos + 1:]
199  if len(line) == 0:
200  break
201  key, value = [x.strip() for x in line.decode('utf-8', 'replace').split(':', 1)]
202  data[key] = value
203  self.datadata = data
204  self.socketsocket.read(self._websocket_read_websocket_read)
205  def disconnect(socket, data):
206  if not self._is_closed_is_closed:
207  self._is_closed_is_closed = True
208  if self.websocketswebsockets is not None:
209  self.websocketswebsockets.remove(self)
210  self.closedclosed()
211  return b''
212  if self.websocketswebsockets is not None:
213  self.websocketswebsockets.add(self)
214  self.socketsocket.disconnect_cb(disconnect)
215  self.openedopened()
216  if len(hdrdata) > 0:
217  self._websocket_read_websocket_read(hdrdata)
218  if DEBUG > 2:
219  log('opened websocket')
220  # }}}
221  def _websocket_read(self, data, sync = False):
222  # Websocket data consists of:
223  # 1 byte:
224  # bit 7: 1 for last(or only) fragment; 0 for other fragments.
225  # bit 6-4: extension stuff; must be 0.
226  # bit 3-0: opcode.
227  # 1 byte:
228  # bit 7: 1 if masked, 0 otherwise.
229  # bit 6-0: length or 126 or 127.
230  # If 126:
231  # 2 bytes: length
232  # If 127:
233  # 8 bytes: length
234  # If masked:
235  # 4 bytes: mask
236  # length bytes: (masked) payload
237 
238  #log('received: ' + repr(data))
239  if DEBUG > 2:
240  log('received %d bytes' % len(data))
241  if DEBUG > 3:
242  log('waiting: ' + ' '.join(['%02x' % x for x in self.websocket_bufferwebsocket_buffer]) + ''.join([chr(x) if 32 <= x < 127 else '.' for x in self.websocket_bufferwebsocket_buffer]))
243  log('data: ' + ' '.join(['%02x' % x for x in data]) + ''.join([chr(x) if 32 <= x < 127 else '.' for x in data]))
244  self.websocket_bufferwebsocket_buffer += data
245  while len(self.websocket_bufferwebsocket_buffer) > 0:
246  if self.websocket_bufferwebsocket_buffer[0] & 0x70:
247  # Protocol error.
248  log('extension stuff %x, not supported!' % self.websocket_bufferwebsocket_buffer[0])
249  self.socketsocket.close()
250  return None
251  if len(self.websocket_bufferwebsocket_buffer) < 2:
252  # Not enough data for length bytes.
253  if DEBUG > 2:
254  log('no length yet')
255  return None
256  b = self.websocket_bufferwebsocket_buffer[1]
257  have_mask = bool(b & 0x80)
258  b &= 0x7f
259  if have_mask and self.maskmask[0] is True or not have_mask and self.maskmask[0] is False:
260  # Protocol error.
261  log('mask error')
262  self.socketsocket.close()
263  return None
264  if b == 127:
265  if len(self.websocket_bufferwebsocket_buffer) < 10:
266  # Not enough data for length bytes.
267  if DEBUG > 2:
268  log('no 4 length yet')
269  return None
270  l = struct.unpack('!Q', self.websocket_bufferwebsocket_buffer[2:10])[0]
271  pos = 10
272  elif b == 126:
273  if len(self.websocket_bufferwebsocket_buffer) < 4:
274  # Not enough data for length bytes.
275  if DEBUG > 2:
276  log('no 2 length yet')
277  return None
278  l = struct.unpack('!H', self.websocket_bufferwebsocket_buffer[2:4])[0]
279  pos = 4
280  else:
281  l = b
282  pos = 2
283  if len(self.websocket_bufferwebsocket_buffer) < pos + (4 if have_mask else 0) + l:
284  # Not enough data for packet.
285  if DEBUG > 2:
286  log('no packet yet(%d < %d)' % (len(self.websocket_bufferwebsocket_buffer), pos + (4 if have_mask else 0) + l))
287  # Long packets should not cause ping timeouts.
288  self._pong_pong = True
289  return None
290  header = self.websocket_bufferwebsocket_buffer[:pos]
291  opcode = header[0] & 0xf
292  if have_mask:
293  mask = [x for x in self.websocket_bufferwebsocket_buffer[pos:pos + 4]]
294  pos += 4
295  data = self.websocket_bufferwebsocket_buffer[pos:pos + l]
296  # The following is slow!
297  # Don't do it if the mask is 0; this is always true if talking to another program using this module.
298  if mask != [0, 0, 0, 0]:
299  data = bytes([x ^ mask[i & 3] for i, x in enumerate(data)])
300  else:
301  data = self.websocket_bufferwebsocket_buffer[pos:pos + l]
302  self.websocket_bufferwebsocket_buffer = self.websocket_bufferwebsocket_buffer[pos + l:]
303  if self.opcodeopcode is None:
304  self.opcodeopcode = opcode
305  elif opcode != 0:
306  # Protocol error.
307  # Exception: pongs are sometimes sent asynchronously.
308  # Theoretically the packet can be fragmented, but that should never happen; asynchronous pongs seem to be a protocol violation anyway...
309  if opcode == 10:
310  # Pong.
311  self._pong_pong = True
312  else:
313  log('invalid fragment')
314  self.socketsocket.close()
315  return None
316  if (header[0] & 0x80) != 0x80:
317  # fragment found; not last.
318  self._pong_pong = True
319  self.websocket_fragmentswebsocket_fragments += data
320  if DEBUG > 2:
321  log('fragment recorded')
322  return None
323  # Complete frame has been received.
324  data = self.websocket_fragmentswebsocket_fragments + data
325  self.websocket_fragmentswebsocket_fragments = b''
326  opcode = self.opcodeopcode
327  self.opcodeopcode = None
328  if opcode == 8:
329  # Connection close request.
330  self.closeclose()
331  return None
332  elif opcode == 9:
333  # Ping.
334  self.sendsend(data, 10) # Pong
335  elif opcode == 10:
336  # Pong.
337  self._pong_pong = True
338  elif opcode == 1:
339  # Text.
340  data = data.decode('utf-8', 'replace')
341  if sync:
342  return data
343  if self.recvrecv:
344  self.recvrecv(self, data)
345  else:
346  log('warning: ignoring incoming websocket frame')
347  elif opcode == 2:
348  # Binary.
349  if sync:
350  return data
351  if self.recvrecv:
352  self.recvrecv(self, data)
353  else:
354  log('warning: ignoring incoming websocket frame (binary)')
355  else:
356  log('invalid opcode')
357  self.socketsocket.close()
358  # }}}
359 
363  def send(self, data, opcode = 1): # Send a WebSocket frame. {{{
364  if DEBUG > 3:
365  log('websend:' + repr(data))
366  assert opcode in(0, 1, 2, 8, 9, 10)
367  if self._is_closed_is_closed:
368  return None
369  if opcode == 1:
370  data = data.encode('utf-8')
371  if self.maskmask[1]:
372  maskchar = 0x80
373  # Masks are stupid, but the standard requires them. Don't waste time on encoding (or decoding, if also using this module).
374  mask = b'\0\0\0\0'
375  else:
376  maskchar = 0
377  mask = b''
378  if len(data) < 126:
379  l = bytes((maskchar | len(data),))
380  elif len(data) < 1 << 16:
381  l = bytes((maskchar | 126,)) + struct.pack('!H', len(data))
382  else:
383  l = bytes((maskchar | 127,)) + struct.pack('!Q', len(data))
384  try:
385  self.socketsocket.send(bytes((0x80 | opcode,)) + l + mask + data)
386  except:
387  # Something went wrong; close the socket(in case it wasn't yet).
388  if DEBUG > 0:
389  traceback.print_exc()
390  log('closing socket due to problem while sending.')
391  self.socketsocket.close()
392  if opcode == 8:
393  self.socketsocket.close()
394  # }}}
395 
399  def ping(self, data = b''): # Send a ping; return False if no pong was seen for previous ping. Other received packets also count as a pong. {{{
400  ret = self._pong_pong
401  self._pong_pong = False
402  self.sendsend(data, opcode = 9)
403  return ret
404  # }}}
405 
408  def close(self): # Close a WebSocket. (Use self.socket.close for other connections.) {{{
409  self.sendsend(b'', 8)
410  self.socketsocket.close()
411  # }}}
412 
419  def opened(self):
420  pass
421  # }}}
422 
426  def closed(self):
427  pass
428  # }}}
429 # }}}
430 
431 # Set of inactive websockets, and idle handle for _activate_all.
432 
435 _activation = [set(), None]
436 
437 
447 def call(reply, target, *a, **ka):
448  ret = target(*a, **ka)
449  if type(ret) is not RPC._generatortype:
450  if reply is not None:
451  reply(ret)
452  return
453  # Target is a generator.
454  def wake(arg = None):
455  try:
456  return ret.send(arg)
457  except StopIteration as result:
458  if reply is not None:
459  reply(result.value)
460  # Start the generator.
461  wake()
462  # Send it its wakeup function.
463  wake(wake)
464 # }}}
465 
466 
480 class RPC(Websocket):
481 
484  _generatortype = type((lambda: (yield))())
485 
488  _index = 0
489 
492  _calls = {}
493  @classmethod
494  def _get_index(cls):
495  while cls._index_index_index in cls._calls_calls:
496  cls._index_index_index += 1
497  # Put a limit on the _index values.
498  if cls._index_index_index >= 1 << 31:
499  cls._index_index_index = 0
500  while cls._index_index_index in cls._calls_calls:
501  cls._index_index_index += 1
502  return cls._index_index_index
503  # }}}
504 
512  def __init__(self, port, recv = None, error = None, *a, **ka):
513  _activation[0].add(self)
514  if _activation[1] is None:
515  _activation[1] = add_idle(_activate_all)
516  self._delayed_calls_delayed_calls = []
517 
518  self.groupsgroups = set()
519  Websocket.__init__(self, port, recv = RPC._recv, *a, **ka)
520  self._error_error = error
521  self._target_target = recv(self) if recv is not None else None
522  # }}}
523 
526  def __call__(self):
527  if self._delayed_calls_delayed_calls is None:
528  return
529  calls = self._delayed_calls_delayed_calls
530  self._delayed_calls_delayed_calls = None
531  for call in calls:
532  if not hasattr(self._target_target, call[1]) or not isinstance(getattr(self._target_target, call[1]), collections.Callable):
533  self._send_send('error', 'invalid delayed call frame %s' % repr(call))
534  else:
535  self._call_call(call[0], call[1], call[2], call[3])
536  # }}}
537  class _wrapper:
538  def __init__(self, base, attr):
539  self.base = base
540  self.attr = attr
541  # }}}
542  def __call__(self, *a, **ka):
543  my_id = RPC._get_index()
544  self.base._send('call', (my_id, self.attr, a, ka))
545  my_call = [None]
546  RPC._calls[my_id] = lambda x: my_call.__setitem__(0, (x,)) # Make it a tuple so it cannot be None.
547  while my_call[0] is None:
548  data = self.base._websocket_read(self.base.socket.recv(), True)
549  while data is not None:
550  self.base._recv(data)
551  data = self.base._websocket_read(b'')
552  del RPC._calls[my_id]
553  return my_call[0][0]
554  # }}}
555  def __getitem__(self, *a, **ka):
556  self.base._send('call', (None, self.attr, a, ka))
557  # }}}
558  def bg(self, reply, *a, **ka):
559  my_id = RPC._get_index()
560  self.base._send('call', (my_id, self.attr, a, ka))
561  RPC._calls[my_id] = lambda x: self.do_reply(reply, my_id, x)
562  # }}}
563  def do_reply(self, reply, my_id, ret):
564  del RPC._calls[my_id]
565  reply(ret)
566  # }}}
567  # alternate names. {{{
568  def call(self, *a, **ka):
569  self.__call____call__(*a, **ka)
570  def event(self, *a, **ka):
571  self.__getitem__(*a, **ka)
572  # }}}
573  # }}}
574 
582  def _send(self, type, object):
583  if DEBUG > 1:
584  log('sending:' + repr(type) + repr(object))
585  Websocket.send(self, json.dumps((type, object)))
586  # }}}
587 
593  def _parse_frame(self, frame):
594  try:
595  # Don't choke on Chrome's junk at the end of packets.
596  data = json.JSONDecoder().raw_decode(frame)[0]
597  except ValueError:
598  log('non-json frame: %s' % repr(frame))
599  return (None, 'non-json frame')
600  if type(data) is not list or len(data) != 2 or not isinstance(data[0], str):
601  log('invalid frame %s' % repr(data))
602  return (None, 'invalid frame')
603  if data[0] == 'call':
604  if not isinstance(data[1], list):
605  log('invalid call frame (no list) %s' % repr(data))
606  return (None, 'invalid frame')
607  if len(data[1]) != 4:
608  log('invalid call frame (list length is not 4) %s' % repr(data))
609  return (None, 'invalid frame')
610  if (data[1][0] is not None and not isinstance(data[1][0], int)):
611  log('invalid call frame (invalid id) %s' % repr(data))
612  return (None, 'invalid frame')
613  if not isinstance(data[1][1], str):
614  log('invalid call frame (no string target) %s' % repr(data))
615  return (None, 'invalid frame')
616  if not isinstance(data[1][2], list):
617  log('invalid call frame (no list args) %s' % repr(data))
618  return (None, 'invalid frame')
619  if not isinstance(data[1][3], dict):
620  log('invalid call frame (no dict kwargs) %s' % repr(data))
621  return (None, 'invalid frame')
622  if (self._delayed_calls_delayed_calls is None and (not hasattr(self._target_target, data[1][1]) or not isinstance(getattr(self._target_target, data[1][1]), collections.Callable))):
623  log('invalid call frame (no callable) %s' % repr(data))
624  return (None, 'invalid frame')
625  elif data[0] not in ('error', 'return'):
626  log('invalid frame type %s' % repr(data))
627  return (None, 'invalid frame')
628  return data
629  # }}}
630 
636  def _recv(self, frame):
637  data = self._parse_frame_parse_frame(frame)
638  if DEBUG > 1:
639  log('packet received: %s' % repr(data))
640  if data[0] is None:
641  self._send_send('error', data[1])
642  return
643  elif data[0] == 'error':
644  if DEBUG > 0:
645  traceback.print_stack()
646  if self._error_error is not None:
647  self._error_error(data[1])
648  else:
649  raise ValueError(data[1])
650  elif data[0] == 'event':
651  # Do nothing with this; the packet is already logged if DEBUG > 1.
652  return
653  elif data[0] == 'return':
654  assert data[1][0] in RPC._calls
655  RPC._calls[data[1][0]] (data[1][1])
656  return
657  elif data[0] == 'call':
658  try:
659  if self._delayed_calls_delayed_calls is not None:
660  self._delayed_calls_delayed_calls.append(data[1])
661  else:
662  self._call_call(data[1][0], data[1][1], data[1][2], data[1][3])
663  except:
664  traceback.print_exc()
665  log('error: %s' % str(sys.exc_info()[1]))
666  self._send_send('error', traceback.format_exc())
667  else:
668  self._send_send('error', 'invalid RPC command')
669  # }}}
670 
681  def _call(self, reply, member, a, ka):
682  call((lambda ret: self._send_send('return', (reply, ret))) if reply is not None else None, getattr(self._target_target, member), *a, **ka)
683  # }}}
684 
688  def __getattr__(self, attr):
689  if attr.startswith('_'):
690  raise AttributeError('invalid RPC function name %s' % attr)
691  return RPC._wrapper(self, attr)
692  # }}}
693 # }}}
694 
695 
700 def _activate_all():
701  if _activation[0] is not None:
702  for s in _activation[0]:
703  s()
704  _activation[0].clear()
705  _activation[1] = None
706  return False
707 # }}}
708 
709 
716 class _Httpd_connection:
717 
729  def __init__(self, server, socket, websocket = Websocket, proxy = (), error = None):
730  self.server = server
731  self.socketsocket = socket
732  self.websocket = websocket
733  self.proxy = (proxy,) if isinstance(proxy, str) else proxy
734  self.error = error
735  self.headers = {}
736  self.address = None
737  self.socketsocket.disconnect_cb(lambda socket, data: b'') # Ignore disconnect until it is a WebSocket.
738  self.socketsocket.readlines(self._line)
739  #log('Debug: new connection from %s\n' % repr(self.socket.remote))
740  # }}}
741  def _line(self, l):
742  if DEBUG > 4:
743  log('Debug: Received line: %s' % l)
744  if self.address is not None:
745  if not l.strip():
746  self._handle_headers()
747  return
748  try:
749  key, value = l.split(':', 1)
750  except ValueError:
751  log('Invalid header line: %s' % l)
752  return
753  self.headers[key.lower()] = value.strip()
754  return
755  else:
756  try:
757  self.method, url, self.standard = l.split()
758  for prefix in self.proxy:
759  if url.startswith('/' + prefix + '/') or url == '/' + prefix:
760  self.prefix = '/' + prefix
761  break
762  else:
763  self.prefix = ''
764  address = urlparse(url)
765  path = address.path[len(self.prefix):] or '/'
766  self.url = path + url[len(address.path):]
767  self.address = urlparse(self.url)
768  self.query = parse_qs(self.address.query)
769  except:
770  traceback.print_exc()
771  self.server.reply(self, 400, close = True)
772  return
773  # }}}
774  def _handle_headers(self):
775  if DEBUG > 4:
776  log('Debug: handling headers')
777  is_websocket = 'connection' in self.headers and 'upgrade' in self.headers and 'upgrade' in self.headers['connection'].lower() and 'websocket' in self.headers['upgrade'].lower()
778  self.datadata = {}
779  self.datadata['url'] = self.url
780  self.datadata['address'] = self.address
781  self.datadata['query'] = self.query
782  self.datadata['headers'] = self.headers
783  msg = self.server.auth_message(self, is_websocket) if callable(self.server.auth_message) else self.server.auth_message
784  if msg:
785  if 'authorization' not in self.headers:
786  self.server.reply(self, 401, headers = {'WWW-Authenticate': 'Basic realm="%s"' % msg.replace('\n', ' ').replace('\r', ' ').replace('"', "'")}, close = True)
787  return
788  else:
789  auth = self.headers['authorization'].split(None, 1)
790  if auth[0].lower() != 'basic':
791  self.server.reply(self, 400, close = True)
792  return
793  pwdata = base64.b64decode(auth[1].encode('utf-8')).decode('utf-8', 'replace').split(':', 1)
794  if len(pwdata) != 2:
795  self.server.reply(self, 400, close = True)
796  return
797  self.datadata['user'] = pwdata[0]
798  self.datadata['password'] = pwdata[1]
799  if not self.server.authenticate(self):
800  self.server.reply(self, 401, headers = {'WWW-Authenticate': 'Basic realm="%s"' % msg.replace('\n', ' ').replace('\r', ' ').replace('"', "'")}, close = True)
801  return
802  if not is_websocket:
803  if DEBUG > 4:
804  log('Debug: not a websocket')
805  self.body = self.socketsocket.unread()
806  if self.method.upper() == 'POST':
807  if 'content-type' not in self.headers or self.headers['content-type'].lower().split(';')[0].strip() != 'multipart/form-data':
808  log('Invalid Content-Type for POST; must be multipart/form-data (not %s)\n' % (self.headers['content-type'] if 'content-type' in self.headers else 'undefined'))
809  self.server.reply(self, 500, close = True)
810  return
811  args = self._parse_args(self.headers['content-type'])[1]
812  if 'boundary' not in args:
813  log('Invalid Content-Type for POST: missing boundary in %s\n' % (self.headers['content-type'] if 'content-type' in self.headers else 'undefined'))
814  self.server.reply(self, 500, close = True)
815  return
816  self.boundary = b'\r\n' + b'--' + args['boundary'].encode('utf-8') + b'\r\n'
817  self.endboundary = b'\r\n' + b'--' + args['boundary'].encode('utf-8') + b'--\r\n'
818  self.post_state = None
819  self.post = [{}, {}]
820  self.socketsocket.read(self._post)
821  self._post(b'')
822  else:
823  try:
824  if not self.server.page(self):
825  self.socketsocket.close()
826  except:
827  if DEBUG > 0:
828  traceback.print_exc()
829  log('exception: %s\n' % repr(sys.exc_info()[1]))
830  try:
831  self.server.reply(self, 500, close = True)
832  except:
833  self.socketsocket.close()
834  return
835  # Websocket.
836  if self.method.upper() != 'GET' or 'sec-websocket-key' not in self.headers:
837  if DEBUG > 2:
838  log('Debug: invalid websocket')
839  self.server.reply(self, 400, close = True)
840  return
841  newkey = base64.b64encode(hashlib.sha1(self.headers['sec-websocket-key'].strip().encode('utf-8') + b'258EAFA5-E914-47DA-95CA-C5AB0DC85B11').digest()).decode('utf-8')
842  headers = {'Sec-WebSocket-Accept': newkey, 'Connection': 'Upgrade', 'Upgrade': 'websocket', 'Sec-WebSocket-Version': '13'}
843  self.server.reply(self, 101, None, None, headers, close = False)
844  self.websocket(None, recv = self.server.recv, url = None, socket = self.socketsocket, error = self.error, mask = (None, False), websockets = self.server.websockets, data = self.datadata, real_remote = self.headers.get('x-forwarded-for'))
845  # }}}
846  def _parse_headers(self, message):
847  lines = []
848  pos = 0
849  while True:
850  p = message.index(b'\r\n', pos)
851  ln = message[pos:p].decode('utf-8', 'replace')
852  pos = p + 2
853  if ln == '':
854  break
855  if ln[0] in ' \t':
856  if len(lines) == 0:
857  log('header starts with continuation')
858  else:
859  lines[-1] += ln
860  else:
861  lines.append(ln)
862  ret = {}
863  for ln in lines:
864  if ':' not in ln:
865  log('ignoring header line without ":": %s' % ln)
866  continue
867  key, value = [x.strip() for x in ln.split(':', 1)]
868  if key.lower() in ret:
869  log('duplicate key in header: %s' % key)
870  ret[key.lower()] = value
871  return ret, message[pos:]
872  # }}}
873  def _parse_args(self, header):
874  if ';' not in header:
875  return (header.strip(), {})
876  pos = header.index(';') + 1
877  main = header[:pos].strip()
878  ret = {}
879  while pos < len(header):
880  if '=' not in header[pos:]:
881  if header[pos:].strip() != '':
882  log('header argument %s does not have a value' % header[pos:].strip())
883  return main, ret
884  p = header.index('=', pos)
885  key = header[pos:p].strip().lower()
886  pos = p + 1
887  value = ''
888  quoted = False
889  while True:
890  first = (len(header), None)
891  if not quoted and ';' in header[pos:]:
892  s = header.index(';', pos)
893  if s < first[0]:
894  first = (s, ';')
895  if '"' in header[pos:]:
896  q = header.index('"', pos)
897  if q < first[0]:
898  first = (q, '"')
899  if '\\' in header[pos:]:
900  b = header.index('\\', pos)
901  if b < first[0]:
902  first = (b, '\\')
903  value += header[pos:first[0]]
904  pos = first[0] + 1
905  if first[1] == ';' or first[1] is None:
906  break
907  if first[1] == '\\':
908  value += header[pos]
909  pos += 1
910  continue
911  quoted = not quoted
912  ret[key] = value
913  return main, ret
914  # }}}
915  def _post(self, data):
916  #log('post body %s data %s' % (repr(self.body), repr(data)))
917  self.body += data
918  if self.post_state is None:
919  # Waiting for first boundary.
920  if self.boundary not in b'\r\n' + self.body:
921  if self.endboundary in b'\r\n' + self.body:
922  self._finish_post()
923  return
924  self.body = b'\r\n' + self.body
925  self.body = self.body[self.body.index(self.boundary) + len(self.boundary):]
926  self.post_state = 0
927  # Fall through.
928  a = 20
929  while True:
930  if self.post_state == 0:
931  # Reading part headers.
932  if b'\r\n\r\n' not in self.body:
933  return
934  headers, self.body = self._parse_headers(self.body)
935  self.post_state = 1
936  if 'content-type' not in headers:
937  post_type = ('text/plain', {'charset': 'us-ascii'})
938  else:
939  post_type = self._parse_args(headers['content-type'])
940  if 'content-transfer-encoding' not in headers:
941  self.post_encoding = '7bit'
942  else:
943  self.post_encoding = self._parse_args(headers['content-transfer-encoding'])[0].lower()
944  # Handle decoding of the data.
945  if self.post_encoding == 'base64':
946  self._post_decoder = self._base64_decoder
947  elif self.post_encoding == 'quoted-printable':
948  self._post_decoder = self._quopri_decoder
949  else:
950  self._post_decoder = lambda x, final: (x, b'')
951  if 'content-disposition' in headers:
952  args = self._parse_args(headers['content-disposition'])[1]
953  if 'name' in args:
954  self.post_name = args['name']
955  else:
956  self.post_name = None
957  if 'filename' in args:
958  fd, self.post_file = tempfile.mkstemp()
959  self.post_handle = os.fdopen(fd, 'wb')
960  if self.post_name not in self.post[1]:
961  self.post[1][self.post_name] = []
962  self.post[1][self.post_name].append((self.post_file, args['filename'], headers, post_type))
963  else:
964  self.post_handle = None
965  else:
966  self.post_name = None
967  if self.post_handle is None:
968  self.post[0][self.post_name] = [b'', headers, post_type]
969  # Fall through.
970  if self.post_state == 1:
971  # Reading part body.
972  if self.endboundary in self.body:
973  p = self.body.index(self.endboundary)
974  else:
975  p = None
976  if self.boundary in self.body and (p is None or self.body.index(self.boundary) < p):
977  self.post_state = 0
978  rest = self.body[self.body.index(self.boundary) + len(self.boundary):]
979  self.body = self.body[:self.body.index(self.boundary)]
980  elif p is not None:
981  self.body = self.body[:p]
982  self.post_state = None
983  else:
984  if len(self.body) <= len(self.boundary):
985  break
986  rest = self.body[-len(self.boundary):]
987  self.body = self.body[:-len(rest)]
988  decoded, self.body = self._post_decoder(self.body, self.post_state != 1)
989  if self.post_handle is not None:
990  self.post_handle.write(decoded)
991  if self.post_state != 1:
992  self.post_handle.close()
993  else:
994  self.post[0][self.post_name][0] += decoded
995  if self.post_state != 1:
996  if self.post[0][self.post_name][2][0] == 'text/plain':
997  self.post[0][self.post_name][0] = self.post[0][self.post_name][0].decode(self.post[0][self.post_name][2][1].get('charset', 'utf-8'), 'replace')
998  if self.post_state is None:
999  self._finish_post()
1000  return
1001  self.body += rest
1002  # }}}
1003  def _finish_post(self):
1004  if not self.server.post(self):
1005  self.socketsocket.close()
1006  for f in self.post[1]:
1007  for g in self.post[1][f]:
1008  os.remove(g[0])
1009  del self.post
1010  # }}}
1011  def _base64_decoder(self, data, final):
1012  ret = b''
1013  pos = 0
1014  table = b'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/='
1015  current = []
1016  while len(data) >= pos + 4 - len(current):
1017  c = data[pos]
1018  pos += 1
1019  if c not in table:
1020  if c not in b'\r\n':
1021  log('ignoring invalid character %s in base64 string' % c)
1022  continue
1023  current.append(table.index(c))
1024  if len(current) == 4:
1025  # decode
1026  ret += bytes((current[0] << 2 | current[1] >> 4,))
1027  if current[2] != 65:
1028  ret += bytes((((current[1] << 4) & 0xf0) | current[2] >> 2,))
1029  if current[3] != 65:
1030  ret += bytes((((current[2] << 6) & 0xc0) | current[3],))
1031  return (ret, data[pos:])
1032  # }}}
1033  def _quopri_decoder(self, data, final):
1034  ret = b''
1035  pos = 0
1036  while b'=' in data[pos:-2]:
1037  p = data.index(b'=', pos)
1038  ret += data[:p]
1039  if data[p + 1:p + 3] == b'\r\n':
1040  ret += b'\n'
1041  pos = p + 3
1042  continue
1043  if any(x not in b'0123456789ABCDEFabcdef' for x in data[p + 1:p + 3]):
1044  log('invalid escaped sequence in quoted printable: %s' % data[p:p + 3].encode('utf-8', 'replace'))
1045  pos = p + 1
1046  continue
1047  ret += bytes((int(data[p + 1:p + 3], 16),))
1048  pos = p + 3
1049  if final:
1050  ret += data[pos:]
1051  pos = len(data)
1052  elif len(pos) >= 2:
1053  ret += data[pos:-2]
1054  pos = len(data) - 2
1055  return (ret, data[pos:])
1056  # }}}
1057 # }}}
1058 
1062 class Httpd:
1063 
1076  def __init__(self, port, recv = None, httpdirs = None, server = None, proxy = (), http_connection = _Httpd_connection, websocket = Websocket, error = None, *a, **ka):
1077 
1078  self.recvrecv = recv
1079  self._http_connection_http_connection = http_connection
1080 
1082  self.httpdirshttpdirs = httpdirs
1083  self._proxy_proxy = proxy
1084  self._websocket_websocket = websocket
1085  self._error_error = error if error is not None else lambda msg: print(msg)
1086 
1088  self.extsexts = {}
1089  # Automatically add all extensions for which a mime type exists.
1090  try:
1091  exts = {}
1092  with open('/etc/mime.types') as f:
1093  for ln in f:
1094  items = ln.split()
1095  for ext in items[1:]:
1096  if ext not in exts:
1097  exts[ext] = items[0]
1098  else:
1099  # Multiple registration: don't choose one.
1100  exts[ext] = False
1101  except FileNotFoundError:
1102  # This is probably a Windows system; use some defaults.
1103  exts = { 'html': 'text/html', 'css': 'text/css', 'js': 'text/javascript', 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg', 'png': 'image/png', 'bmp': 'image/bmp', 'gif': 'image/gif', 'pdf': 'application/pdf', 'svg': 'image/svg+xml', 'txt': 'text/plain'}
1104  for ext in exts:
1105  if exts[ext] is not False:
1106  if exts[ext].startswith('text/') or exts[ext] == 'application/javascript':
1107  self.handle_exthandle_ext(ext, exts[ext] + ';charset=utf-8')
1108  else:
1109  self.handle_exthandle_ext(ext, exts[ext])
1110 
1111  self.websocketswebsockets = set()
1112  if server is None:
1113 
1114  self.serverserver = network.Server(port, self, *a, **ka)
1115  else:
1116  self.serverserver = server
1117  # }}}
1118 
1124  def __call__(self, socket):
1125  return self._http_connection_http_connection(self, socket, proxy = self._proxy_proxy, websocket = self._websocket_websocket, error = self._error_error)
1126  # }}}
1127 
1136  def handle_ext(self, ext, mime):
1137  self.extsexts[ext] = lambda socket, message: self.replyreply(socket, 200, message, mime)
1138  # }}}
1139  # Authentication. {{{
1140 
1141  auth_message = None
1142 
1173  def authenticate(self, connection):
1174  return True
1175  # }}}
1176  # }}}
1177  # The following function can be called by the overloaded page function.
1178 
1201  def reply(self, connection, code, message = None, content_type = None, headers = None, close = False): # Send HTTP status code and headers, and optionally a message. {{{
1202  assert code in httpcodes
1203  #log('Debug: sending reply %d %s for %s\n' % (code, httpcodes[code], connection.address.path))
1204  connection.socket.send(('HTTP/1.1 %d %s\r\n' % (code, httpcodes[code])).encode('utf-8'))
1205  if headers is None:
1206  headers = {}
1207  if message is None and code != 101:
1208  assert content_type is None
1209  content_type = 'text/html; charset=utf-8'
1210  message = ('<!DOCTYPE html><html><head><title>%d: %s</title></head><body><h1>%d: %s</h1></body></html>' % (code, httpcodes[code], code, httpcodes[code])).encode('utf-8')
1211  if close and 'Connection' not in headers:
1212  headers['Connection'] = 'close'
1213  if content_type is not None:
1214  headers['Content-Type'] = content_type
1215  headers['Content-Length'] = '%d' % len(message)
1216  else:
1217  assert code == 101
1218  message = b''
1219  connection.socket.send((''.join(['%s: %s\r\n' % (x, headers[x]) for x in headers]) + '\r\n').encode('utf-8') + message)
1220  if close:
1221  connection.socket.close()
1222  # }}}
1223  # If httpdirs is not given, or special handling is desired, this can be overloaded.
1224 
1237  def page(self, connection, path = None): # A non-WebSocket page was requested. Use connection.address, connection.method, connection.query, connection.headers and connection.body (which should be empty) to find out more. {{{
1238  if self.httpdirshttpdirs is None:
1239  self.replyreply(connection, 501)
1240  return
1241  if path is None:
1242  path = connection.address.path
1243  if path == '/':
1244  address = 'index'
1245  else:
1246  address = '/' + unquote(path) + '/'
1247  while '/../' in address:
1248  # Don't handle this; just ignore it.
1249  pos = address.index('/../')
1250  address = address[:pos] + address[pos + 3:]
1251  address = address[1:-1]
1252  if '.' in address.rsplit('/', 1)[-1]:
1253  base, ext = address.rsplit('.', 1)
1254  base = base.strip('/')
1255  if ext not in self.extsexts and None not in self.extsexts:
1256  log('not serving unknown extension %s' % ext)
1257  self.replyreply(connection, 404)
1258  return
1259  for d in self.httpdirshttpdirs:
1260  filename = os.path.join(d, base + os.extsep + ext)
1261  if os.path.exists(filename):
1262  break
1263  else:
1264  log('file %s not found in %s' % (base + os.extsep + ext, ', '.join(self.httpdirshttpdirs)))
1265  self.replyreply(connection, 404)
1266  return
1267  else:
1268  base = address.strip('/')
1269  for ext in self.extsexts:
1270  for d in self.httpdirshttpdirs:
1271  filename = os.path.join(d, base if ext is None else base + os.extsep + ext)
1272  if os.path.exists(filename):
1273  break
1274  else:
1275  continue
1276  break
1277  else:
1278  log('no file %s (with supported extension) found in %s' % (base, ', '.join(self.httpdirshttpdirs)))
1279  self.replyreply(connection, 404)
1280  return
1281  return self.extsexts[ext](connection, open(filename, 'rb').read())
1282  # }}}
1283 
1293  def post(self, connection): # A non-WebSocket page was requested with POST. Same as page() above, plus connection.post, which is a dict of name:(headers, sent_filename, local_filename). When done, the local files are unlinked; remove the items from the dict to prevent this. The default is to return an error (so POST cannot be used to retrieve static pages!) {{{
1294  log('Warning: ignoring POST request.')
1295  self.replyreply(connection, 501)
1296  return False
1297  # }}}
1298 # }}}
1299 
1302  class _Broadcast:
1303  def __init__(self, server, group = None):
1304  self.serverserver = server
1305  self.group = group
1306  def __getitem__(self, item):
1307  return RPChttpd._Broadcast(self.serverserver, item)
1308  def __getattr__(self, key):
1309  if key.startswith('_'):
1310  raise AttributeError('invalid member name')
1311  def impl(*a, **ka):
1312  for c in self.serverserver.websockets.copy():
1313  if self.group is None or self.group in c.groups:
1314  getattr(c, key).event(*a, **ka)
1315  return impl
1316  # }}}
1317 
1331  def __init__(self, port, target, *a, **ka):
1332 
1344  self.broadcastbroadcast = RPChttpd._Broadcast(self)
1345  if 'log' in ka:
1346  name = ka.pop('log')
1347  if name:
1348  global DEBUG
1349  if DEBUG < 2:
1350  DEBUG = 2
1351  if os.path.isdir(name):
1352  n = os.path.join(name, time.strftime('%F %T%z'))
1353  old = n
1354  i = 0
1355  while os.path.exists(n):
1356  i += 1
1357  n = '%s.%d' % (old, i)
1358  else:
1359  n = name
1360  try:
1361  f = open(n, 'a')
1362  if n != name:
1363  sys.stderr.write('Logging to %s\n' % n)
1364  except IOError:
1365  fd, n = tempfile.mkstemp(prefix = os.path.basename(n) + '-' + time.strftime('%F %T%z') + '-', text = True)
1366  sys.stderr.write('Opening file %s failed, using tempfile instead: %s\n' % (name, n))
1367  f = os.fdopen(fd, 'a')
1368  stderr_fd = sys.stderr.fileno()
1369  os.close(stderr_fd)
1370  os.dup2(f.fileno(), stderr_fd)
1371  log('Start logging to %s, commandline = %s' % (n, repr(sys.argv)))
1372  Httpd.__init__(self, port, target, websocket = RPC, *a, **ka)
1373  # }}}
1374 # }}}
1375 
1379 def fgloop(*a, **ka):
1380  _activate_all()
1381  return network.fgloop(*a, **ka)
1382 # }}}
1383 
1387 def bgloop(*a, **ka):
1388  _activate_all()
1389  return network.bgloop(*a, **ka)
1390 # }}}
1391 
1395 def iteration(*a, **ka):
1396  _activate_all()
1397  return network.iteration(*a, **ka)
1398 # }}}
def page(self, connection, path=None)
Serve a non-websocket page.
Definition: websocketd.py:1237
recv
Communication object for new websockets.
Definition: websocketd.py:1078
httpdirs
Sequence of directories that that are searched to serve.
Definition: websocketd.py:1082
def post(self, connection)
Handle POST request.
Definition: websocketd.py:1293
def reply(self, connection, code, message=None, content_type=None, headers=None, close=False)
Reply to a request for a document.
Definition: websocketd.py:1201
server
network.Server object.
Definition: websocketd.py:1114
def authenticate(self, connection)
Handle user authentication.
Definition: websocketd.py:1173
def handle_ext(self, ext, mime)
Add file extension to handle successfully.
Definition: websocketd.py:1136
exts
Extensions which are handled from httpdirs.
Definition: websocketd.py:1088
websockets
Currently connected websocket connections.
Definition: websocketd.py:1111
HTTP server.
Definition: websocketd.py:1062
groups
Groups are used to do selective broadcast() events.
Definition: websocketd.py:518
Remote Procedure Call over Websocket.
Definition: websocketd.py:480
broadcast
Function to send an event to some or all connected clients.
Definition: websocketd.py:1344
Http server which serves websockets that implement RPC.
Definition: websocketd.py:1301
def closed(self)
This function does nothing by default, but can be overridden by the application.
Definition: websocketd.py:426
def opened(self)
This function does nothing by default, but can be overridden by the application.
Definition: websocketd.py:419
def send(self, data, opcode=1)
Send a Websocket frame to the remote end of the connection.
Definition: websocketd.py:363
def close(self)
Send close request, and close the connection.
Definition: websocketd.py:408
def ping(self, data=b'')
Send a ping, return if a pong was received since last ping.
Definition: websocketd.py:399
Main class implementing the websocket protocol.
Definition: websocketd.py:108
def call(reply, target, *a, **ka)
Make a call to a function or generator.
Definition: websocketd.py:447
def bgloop(*a, **ka)
Activate all websockets and start the main loop in the background.
Definition: websocketd.py:1387
def iteration(*a, **ka)
Activate all websockets and run one loop iteration.
Definition: websocketd.py:1395
def fgloop(*a, **ka)
Activate all websockets and start the main loop.
Definition: websocketd.py:1379