Compare commits

...

38 Commits

Author SHA1 Message Date
Jannes Höke 57efde5e0f Bump version to v4.3 2016-06-28 13:35:42 +02:00
Noam Meltzer e0539d5992 Merge pull request #327 from python-telegram-bot/urllib3
Urllib3
2016-06-20 06:30:25 +03:00
leandrotoledo b41f7e3e79 Code style with latest yapf 2016-06-19 17:50:02 -04:00
Jannes Höke caf72ca490 Merge branch 'urllib3' of github.com:python-telegram-bot/python-telegram-bot into urllib3 2016-06-19 23:46:53 +02:00
Jannes Höke 7635bc0eec comments, lock thread pool, while 1 and snake_case everywhere 2016-06-19 23:46:34 +02:00
Jannes Höke 703bece155 set loglevel of urllib3 to WARNING by default 2016-06-19 23:40:34 +02:00
Jannes Höke 949f4a4fbd update requirements: minimum versions of urllib3 and future 2016-06-19 23:39:00 +02:00
leandrotoledo 05522e4321 Merge remote-tracking branch 'origin/master' into urllib3 2016-06-19 17:38:19 -04:00
leandrotoledo 4f101a79bb Update travis yapf [ci skip] 2016-06-19 17:08:12 -04:00
Noam Meltzer 5b91194cc7 new yapf version, new cosmetic fixes 2016-06-18 20:05:10 +03:00
Noam Meltzer 494a7ec1e4 ypaf fixes 2016-06-18 19:57:11 +03:00
Noam Meltzer fc05d3a626 switch back to PoolManager
telegram servers might send a reponse with HTTP 302 (redirect) to
another hostname. in such case HTTPSConnectionPool will fail to do the
job
2016-06-18 19:50:18 +03:00
Noam Meltzer bc77c845ea test_updater: make sure that conpool is stopped before setting updater
even for the first unitest, it might come after another unitests from
another file which had already init the conpool.
2016-06-18 09:53:08 +03:00
Noam Meltzer a814e9de6b make sure to stop conpool between sensitive unitests 2016-06-18 00:50:44 +03:00
Noam Meltzer d37b6d6735 make sure to stop Updater after the test_createBot is over 2016-06-18 00:01:36 +03:00
Noam Meltzer e479c7f25e type hinting (cosmetic fix) 2016-06-17 23:59:32 +03:00
Noam Meltzer a30411c9fa make sure to remove the stopped dispatcher threads from ASYNC_THREADS 2016-06-17 23:58:22 +03:00
Noam Meltzer 881d1d0e25 fix/hack Updater.stop() not working on extreme cases
during test_bootstrap_retries_fail() there is an exception raised (by
design): TelegramError('test')

For a reason I haven't managed to pinpoint the above exception in its
precise timing caused the Updater to be left in a state which is
'self.running == False', but the dispatcher threads already initialized.
This patch identifies this extreme case and makes sure to go over the
stop procedure.
2016-06-17 23:53:18 +03:00
Noam Meltzer cb6ddfded5 Merge remote-tracking branch 'origin/master' into urllib3 2016-06-17 17:54:04 +03:00
Noam Meltzer bda0244ed8 updater: fix print in log 2016-06-17 16:52:25 +03:00
Rahiel Kasim 9338f93d24 Merge pull request #325 from python-telegram-bot/examples
more robust echobot, let roboed go
2016-06-12 17:08:15 +02:00
Rahiel Kasim e10fa66286 echobot: simplify handling messageless updates 2016-06-12 17:06:03 +02:00
Rahiel Kasim deb9de0ba0 README: remove roboed, rename example 2016-06-12 16:58:18 +02:00
Rahiel Kasim 94fd6851ab more robust echobot, let roboed go 2016-06-12 15:30:56 +02:00
Noam Meltzer 1f5601dae2 fix SyntaxWarning 2016-06-01 22:38:08 +03:00
Noam Meltzer 3608c2bbe5 dispatcher: if connection pool is already initialized raise exception
this will better protect the user from wrong usage
2016-06-01 22:30:34 +03:00
Noam Meltzer c28763c5be dispatcher: cosmetic fix 2016-06-01 22:30:33 +03:00
Noam Meltzer dd8b6219b9 dispatcher: a little performance improvment 2016-06-01 22:30:33 +03:00
Noam Meltzer 78f9bdcac9 dispatcher: pep8 style fix
globals are supposed to be upper case
2016-06-01 22:30:09 +03:00
Jannes Höke 1ff348adbb issue warning if connection pool was initialized before Dispatcher 2016-05-31 13:47:43 +02:00
Jannes Höke 6b457bada5 use keepalive for connection pool 2016-05-31 13:45:43 +02:00
Jannes Höke 74283bd414 use HTTPSConnectionPool instead of PoolManager 2016-05-30 17:12:50 +02:00
Jannes Höke 41f6591ac6 more sensible logging 2016-05-30 17:12:27 +02:00
Jannes Höke dd91ce1f39 use single queue for thread pool, initialize connection pool with n+3 2016-05-30 13:09:23 +02:00
Jannes Höke 57759d8e6d [drunk] use actual thread pool and queue new functions into the pool instead of starting new threads every time 2016-05-30 03:16:33 +02:00
Noam Meltzer 574fc8cddf urllib3: validate https certificate 2016-05-30 01:05:19 +03:00
Noam Meltzer b040568b07 test_bot: fix for urllib3 compatibility 2016-05-30 01:05:19 +03:00
Noam Meltzer 3076dfc086 use urllib3 instead of urllib(2) 2016-05-30 01:05:19 +03:00
14 changed files with 183 additions and 166 deletions
+1 -1
View File
@@ -4,7 +4,7 @@
- id: yapf
files: ^(telegram|tests)/.*\.py$
- repo: git://github.com/pre-commit/pre-commit-hooks
sha: adbb569fe9a64ad9bce3b53a77f1bc39ef31f682
sha: 6dfcb89af3c9b4d172cc2e5a8a2fa0f54615a338
hooks:
- id: flake8
files: ^telegram/.*\.py$
+8
View File
@@ -1,3 +1,11 @@
**2016-06-28**
*Released 4.3*
- Use ``urllib3.PoolManager`` for connection re-use
- Rewrite ``run_async`` decorator to re-use threads
- New requirements: ``urllib3`` and ``certifi``
**2016-06-10**
*Released 4.2.1*
+1 -5
View File
@@ -129,11 +129,7 @@ code and building on top of it.
- `timerbot <https://github.com/python-telegram-bot/python-telegram-bot/blob/master/examples/timerbot.py>`_ uses the ``JobQueue`` to send timed messages.
Examples using only the pure API:
- `echobot <https://github.com/python-telegram-bot/python-telegram-bot/blob/master/examples/legacy/echobot.py>`_ replies back messages.
- `roboed <https://github.com/python-telegram-bot/python-telegram-bot/blob/master/examples/legacy/roboed.py>`_ talks to `Robô Ed <http://www.ed.conpet.gov.br/br/converse.php>`_.
- `echobot <https://github.com/python-telegram-bot/python-telegram-bot/blob/master/examples/legacy/echobot.py>`_ uses only the pure API to echo messages.
Look at the examples on the `wiki <https://github.com/python-telegram-bot/python-telegram-bot/wiki/Examples>`_ to see other bots the community has built.
@@ -1,16 +1,19 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Simple Bot to reply to Telegram messages
# Simple Bot to reply to Telegram messages. This is built on the API wrapper, see
# echobot2.py to see the same example built on the telegram.ext bot framework.
# This program is dedicated to the public domain under the CC0 license.
import logging
import telegram
from telegram.error import NetworkError, Unauthorized
from time import sleep
update_id = None
def main():
global update_id
# Telegram Bot Authorization Token
bot = telegram.Bot('TOKEN')
@@ -25,7 +28,7 @@ def main():
while True:
try:
update_id = echo(bot, update_id)
echo(bot)
except NetworkError:
sleep(1)
except Unauthorized:
@@ -33,20 +36,17 @@ def main():
update_id += 1
def echo(bot, update_id):
def echo(bot):
global update_id
# Request updates after the last update_id
for update in bot.getUpdates(offset=update_id, timeout=10):
# chat_id is required to reply to any message
chat_id = update.message.chat_id
update_id = update.update_id + 1
message = update.message.text
if message:
if update.message: # your bot can receive updates without messages
# Reply to the message
bot.sendMessage(chat_id=chat_id, text=message)
return update_id
bot.sendMessage(chat_id=chat_id, text=update.message.text)
if __name__ == '__main__':
-38
View File
@@ -1,38 +0,0 @@
#!/usr/bin/env python
# encoding: utf-8
#
# Robô Ed Telegram Bot
# This program is dedicated to the public domain under the CC0 license.
import logging
import telegram
import urllib
def main():
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
bot = telegram.Bot('TOKEN') # Telegram Bot Authorization Token
LAST_UPDATE_ID = bot.getUpdates()[-1].update_id # Get lastest update
while True:
for update in bot.getUpdates(offset=LAST_UPDATE_ID, timeout=10):
text = update.message.text
chat_id = update.message.chat.id
update_id = update.update_id
if text:
roboed = ed(text) # Ask something to Robô Ed
bot.sendMessage(chat_id=chat_id, text=roboed)
LAST_UPDATE_ID = update_id + 1
def ed(text):
url = 'http://www.ed.conpet.gov.br/mod_perl/bot_gateway.cgi?server=0.0.0.0%3A8085&charset_post=utf-8&charset=utf-8&pure=1&js=0&tst=1&msg=' + text
data = urllib.urlopen(url).read()
return data.strip()
if __name__ == '__main__':
main()
+3 -1
View File
@@ -1 +1,3 @@
future
future>=0.15.2
urllib3>=1.8.3
certifi
+1 -1
View File
@@ -82,7 +82,7 @@ from .update import Update
from .bot import Bot
__author__ = 'devs@python-telegram-bot.org'
__version__ = '4.2.1'
__version__ = '4.3'
__all__ = ['Audio', 'Bot', 'Chat', 'ChatMember', 'ChatAction', 'ChosenInlineResult',
'CallbackQuery', 'Contact', 'Document', 'Emoji', 'File', 'ForceReply',
'InlineKeyboardButton', 'InlineKeyboardMarkup', 'InlineQuery', 'InlineQueryResult',
+48 -37
View File
@@ -20,24 +20,47 @@
import logging
from functools import wraps
from threading import Thread, BoundedSemaphore, Lock, Event, current_thread
from threading import Thread, Lock, Event, current_thread
from time import sleep
from queue import Queue, Empty
from queue import Empty
from future.builtins import range
from telegram import (TelegramError, NullHandler)
from telegram.utils import request
from telegram.ext.handler import Handler
from telegram.utils.deprecate import deprecate
logging.getLogger(__name__).addHandler(NullHandler())
semaphore = None
async_threads = set()
ASYNC_QUEUE = Queue()
ASYNC_THREADS = set()
""":type: set[Thread]"""
async_lock = Lock()
ASYNC_LOCK = Lock() # guards ASYNC_THREADS
DEFAULT_GROUP = 0
def _pooled():
"""
A wrapper to run a thread in a thread pool
"""
while 1:
try:
func, args, kwargs = ASYNC_QUEUE.get()
# If unpacking fails, the thread pool is being closed from Updater._join_async_threads
except TypeError:
logging.getLogger(__name__).debug("Closing run_async thread %s/%d" %
(current_thread().getName(), len(ASYNC_THREADS)))
break
try:
func(*args, **kwargs)
except:
logging.getLogger(__name__).exception("run_async function raised exception")
def run_async(func):
"""
Function decorator that will run the function in a new thread.
@@ -53,30 +76,11 @@ def run_async(func):
# set a threading.Event to notify caller thread
@wraps(func)
def pooled(*pargs, **kwargs):
"""
A wrapper to run a thread in a thread pool
"""
try:
result = func(*pargs, **kwargs)
finally:
semaphore.release()
with async_lock:
async_threads.remove(current_thread())
return result
@wraps(func)
def async_func(*pargs, **kwargs):
def async_func(*args, **kwargs):
"""
A wrapper to run a function in a thread
"""
thread = Thread(target=pooled, args=pargs, kwargs=kwargs)
semaphore.acquire()
with async_lock:
async_threads.add(thread)
thread.start()
return thread
ASYNC_QUEUE.put((func, args, kwargs))
return async_func
@@ -107,11 +111,18 @@ class Dispatcher(object):
self.__stop_event = Event()
self.__exception_event = exception_event or Event()
global semaphore
if not semaphore:
semaphore = BoundedSemaphore(value=workers)
else:
self.logger.debug('Semaphore already initialized, skipping.')
with ASYNC_LOCK:
if not ASYNC_THREADS:
if request.is_con_pool_initialized():
raise RuntimeError('Connection Pool already initialized')
request.CON_POOL_SIZE = workers + 3
for i in range(workers):
thread = Thread(target=_pooled, name=str(i))
ASYNC_THREADS.add(thread)
thread.start()
else:
self.logger.debug('Thread pool already initialized, skipping.')
def start(self):
"""
@@ -131,7 +142,7 @@ class Dispatcher(object):
self.running = True
self.logger.debug('Dispatcher started')
while True:
while 1:
try:
# Pop update from update queue.
update = self.update_queue.get(True, 1)
@@ -145,7 +156,7 @@ class Dispatcher(object):
continue
self.logger.debug('Processing Update: %s' % update)
self.processUpdate(update)
self.process_update(update)
self.running = False
self.logger.debug('Dispatcher thread stopped')
@@ -160,7 +171,7 @@ class Dispatcher(object):
sleep(0.1)
self.__stop_event.clear()
def processUpdate(self, update):
def process_update(self, update):
"""
Processes a single update.
@@ -170,7 +181,7 @@ class Dispatcher(object):
# An error happened while polling
if isinstance(update, TelegramError):
self.dispatchError(None, update)
self.dispatch_error(None, update)
else:
for group in self.groups:
@@ -185,7 +196,7 @@ class Dispatcher(object):
'Update.')
try:
self.dispatchError(update, te)
self.dispatch_error(update, te)
except Exception:
self.logger.exception('An uncaught error was raised while '
'handling the error')
@@ -271,7 +282,7 @@ class Dispatcher(object):
if callback in self.error_handlers:
self.error_handlers.remove(callback)
def dispatchError(self, update, error):
def dispatch_error(self, update, error):
"""
Dispatches an error.
+17 -12
View File
@@ -309,7 +309,7 @@ class Updater(object):
def _bootstrap(self, max_retries, clean, webhook_url, cert=None):
retries = 0
while True:
while 1:
try:
if clean:
@@ -346,7 +346,7 @@ class Updater(object):
self.job_queue.stop()
with self.__lock:
if self.running:
if self.running or dispatcher.ASYNC_THREADS:
self.logger.debug('Stopping Updater and Dispatcher...')
self.running = False
@@ -354,9 +354,8 @@ class Updater(object):
self._stop_httpd()
self._stop_dispatcher()
self._join_threads()
# async threads must be join()ed only after the dispatcher
# thread was joined, otherwise we can still have new async
# threads dispatched
# async threads must be join()ed only after the dispatcher thread was joined,
# otherwise we can still have new async threads dispatched
self._join_async_threads()
def _stop_httpd(self):
@@ -372,13 +371,19 @@ class Updater(object):
self.dispatcher.stop()
def _join_async_threads(self):
with dispatcher.async_lock:
threads = list(dispatcher.async_threads)
total = len(threads)
for i, thr in enumerate(threads):
self.logger.debug('Waiting for async thread {0}/{1} to end'.format(i, total))
thr.join()
self.logger.debug('async thread {0}/{1} has ended'.format(i, total))
with dispatcher.ASYNC_LOCK:
threads = list(dispatcher.ASYNC_THREADS)
total = len(threads)
# Stop all threads in the thread pool by put()ting one non-tuple per thread
for i in range(total):
dispatcher.ASYNC_QUEUE.put(None)
for i, thr in enumerate(threads):
self.logger.debug('Waiting for async thread {0}/{1} to end'.format(i + 1, total))
thr.join()
dispatcher.ASYNC_THREADS.remove(thr)
self.logger.debug('async thread {0}/{1} has ended'.format(i + 1, total))
def _join_threads(self):
for thr in self.__threads:
+83 -57
View File
@@ -18,29 +18,56 @@
# along with this program. If not, see [http://www.gnu.org/licenses/].
"""This module contains methods to make POST and GET requests"""
import functools
import json
import socket
from ssl import SSLError
import logging
from future.moves.http.client import HTTPException
from future.moves.urllib.error import HTTPError, URLError
from future.moves.urllib.request import urlopen, urlretrieve, Request
import certifi
import urllib3
from urllib3.connection import HTTPConnection
from telegram import (InputFile, TelegramError)
from telegram.error import Unauthorized, NetworkError, TimedOut, BadRequest
_CON_POOL = None
""":type: urllib3.PoolManager"""
CON_POOL_SIZE = 1
logging.getLogger('urllib3').setLevel(logging.WARNING)
def _get_con_pool():
global _CON_POOL
if _CON_POOL is not None:
return _CON_POOL
_CON_POOL = urllib3.PoolManager(maxsize=CON_POOL_SIZE,
cert_reqs='CERT_REQUIRED',
ca_certs=certifi.where(),
socket_options=HTTPConnection.default_socket_options + [
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
])
return _CON_POOL
def is_con_pool_initialized():
return _CON_POOL is not None
def stop_con_pool():
global _CON_POOL
if _CON_POOL is not None:
_CON_POOL.clear()
_CON_POOL = None
def _parse(json_data):
"""Try and parse the JSON returned from Telegram and return an empty
dictionary if there is any error.
Args:
url:
urllib.urlopen object
"""Try and parse the JSON returned from Telegram.
Returns:
A JSON parsed as Python dict with results.
dict: A JSON parsed as Python dict with results - on error this dict will be empty.
"""
decoded_s = json_data.decode('utf-8')
try:
@@ -54,53 +81,49 @@ def _parse(json_data):
return data['result']
def _try_except_req(func):
"""Decorator for requests to handle known exceptions"""
def _request_wrapper(*args, **kwargs):
"""Wraps urllib3 request for handling known exceptions.
@functools.wraps(func)
def decorator(*args, **kwargs):
try:
return func(*args, **kwargs)
Args:
args: unnamed arguments, passed to urllib3 request.
kwargs: keyword arguments, passed tp urllib3 request.
except HTTPError as error:
# `HTTPError` inherits from `URLError` so `HTTPError` handling must
# come first.
errcode = error.getcode()
Returns:
str: A non-parsed JSON text.
try:
message = _parse(error.read())
Raises:
TelegramError
if errcode in (401, 403):
raise Unauthorized()
elif errcode == 400:
raise BadRequest(message)
elif errcode == 502:
raise NetworkError('Bad Gateway')
except ValueError:
message = 'Unknown HTTPError {0}'.format(error.getcode())
"""
raise NetworkError('{0} ({1})'.format(message, errcode))
try:
resp = _get_con_pool().request(*args, **kwargs)
except urllib3.exceptions.TimeoutError as error:
raise TimedOut()
except urllib3.exceptions.HTTPError as error:
# HTTPError must come last as its the base urllib3 exception class
# TODO: do something smart here; for now just raise NetworkError
raise NetworkError('urllib3 HTTPError {0}'.format(error))
except URLError as error:
raise NetworkError('URLError: {0}'.format(error.reason))
if 200 <= resp.status <= 299:
# 200-299 range are HTTP success statuses
return resp.data
except (SSLError, socket.timeout) as error:
err_s = str(error)
if 'operation timed out' in err_s:
raise TimedOut()
try:
message = _parse(resp.data)
except ValueError:
raise NetworkError('Unknown HTTPError {0}'.format(resp.status))
raise NetworkError(err_s)
except HTTPException as error:
raise NetworkError('HTTPException: {0!r}'.format(error))
except socket.error as error:
raise NetworkError('socket.error: {0!r}'.format(error))
return decorator
if resp.status in (401, 403):
raise Unauthorized()
elif resp.status == 400:
raise BadRequest(repr(message))
elif resp.status == 502:
raise NetworkError('Bad Gateway')
else:
raise NetworkError('{0} ({1})'.format(message, resp.status))
@_try_except_req
def get(url):
"""Request an URL.
Args:
@@ -109,13 +132,13 @@ def get(url):
Returns:
A JSON object.
"""
result = urlopen(url).read()
result = _request_wrapper('GET', url)
return _parse(result)
@_try_except_req
def post(url, data, timeout=None):
"""Request an URL.
Args:
@@ -142,16 +165,17 @@ def post(url, data, timeout=None):
if InputFile.is_inputfile(data):
data = InputFile(data)
request = Request(url, data=data.to_form(), headers=data.headers)
result = _request_wrapper('POST', url, body=data.to_form(), headers=data.headers)
else:
data = json.dumps(data)
request = Request(url, data=data.encode(), headers={'Content-Type': 'application/json'})
result = _request_wrapper('POST',
url,
body=data.encode(),
headers={'Content-Type': 'application/json'})
result = urlopen(request, **urlopen_kwargs).read()
return _parse(result)
@_try_except_req
def download(url, filename):
"""Download a file by its URL.
Args:
@@ -160,6 +184,8 @@ def download(url, filename):
filename:
The filename within the path to download the file.
"""
urlretrieve(url, filename)
"""
buf = _request_wrapper('GET', url)
with open(filename, 'wb') as fobj:
fobj.write(buf)
+4 -2
View File
@@ -20,6 +20,7 @@
"""This module contains a object that represents Tests for Telegram Bot"""
import io
import re
from datetime import datetime
import sys
@@ -211,10 +212,11 @@ class BotTest(BaseTest, unittest.TestCase):
@flaky(3, 1)
@timeout(10)
def testLeaveChat(self):
with self.assertRaisesRegexp(telegram.error.BadRequest, 'Chat not found'):
regex = re.compile('chat not found', re.IGNORECASE)
with self.assertRaisesRegexp(telegram.error.BadRequest, regex):
chat = self._bot.leaveChat(-123456)
with self.assertRaisesRegexp(telegram.error.NetworkError, 'Chat not found'):
with self.assertRaisesRegexp(telegram.error.NetworkError, regex):
chat = self._bot.leaveChat(-123456)
@flaky(3, 1)
+2
View File
@@ -31,6 +31,7 @@ else:
sys.path.append('.')
from telegram.utils.request import stop_con_pool
from telegram.ext import JobQueue, Updater
from tests.base import BaseTest
@@ -58,6 +59,7 @@ class JobQueueTest(BaseTest, unittest.TestCase):
def tearDown(self):
if self.jq is not None:
self.jq.stop()
stop_con_pool()
def job1(self, bot):
self.result += 1
+5 -2
View File
@@ -48,6 +48,7 @@ except ImportError:
sys.path.append('.')
from telegram import Update, Message, TelegramError, User, Chat, Bot
from telegram.utils.request import stop_con_pool
from telegram.ext import *
from telegram.ext.dispatcher import run_async
from telegram.error import Unauthorized, InvalidToken
@@ -78,12 +79,14 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self.lock = Lock()
def _setup_updater(self, *args, **kwargs):
stop_con_pool()
bot = MockBot(*args, **kwargs)
self.updater = Updater(workers=2, bot=bot)
def tearDown(self):
if self.updater is not None:
self.updater.stop()
stop_con_pool()
def reset(self):
self.message_count = 0
@@ -639,8 +642,8 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self.assertFalse(self.updater.running)
def test_createBot(self):
updater = Updater('123:abcd')
self.assertIsNotNone(updater.bot)
self.updater = Updater('123:abcd')
self.assertIsNotNone(self.updater.bot)
def test_mutualExclusiveTokenBot(self):
bot = Bot('123:zyxw')