Как ограничить время выполнения вызова функции в Python

В моем коде есть вызов функции, связанной с сокетом, эта функция из другого модуля, таким образом, вне моего контроля, проблема в том, что она иногда блокируется на несколько часов, что совершенно неприемлемо. Как я могу ограничить время выполнения функции из своего кода? Я думаю, что решение должно использовать другой поток.

14.12.2008 16:20:24
Возможная
SmartManoj 8.05.2019 13:07:59
9 ОТВЕТОВ
РЕШЕНИЕ

Я не уверен, насколько кроссплатформенным это может быть, но использование сигналов и сигналов тревоги может быть хорошим способом посмотреть на это. Немного поработав, вы можете сделать это полностью универсальным и пригодным для использования в любой ситуации.

http://docs.python.org/library/signal.html

Итак, ваш код будет выглядеть примерно так.

import signal

def signal_handler(signum, frame):
    raise Exception("Timed out!")

signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(10)   # Ten seconds
try:
    long_function_call()
except Exception, msg:
    print "Timed out!"
39
22.02.2013 09:11:46
Что если я использую будильник для чего-то еще? :-)
mat 21.12.2008 20:30:35
Да, наверное, стоит выбрать один или другой. Я сам парень процесса / сигнала. После просмотра blip.tv/file/2232410 я все меньше доверяю модели потоков Python.
rik.the.vik 14.07.2009 16:45:02
Кроме того, это не отключает будильник впоследствии
Casebash 19.10.2009 12:14:43
signal.alarm и signal.SIGALRM доступны только в Unix.
Logan 31.12.2017 01:39:50

Вам не нужно использовать темы. Вы можете использовать другой процесс для выполнения работы по блокировке, например, используя модуль подпроцесса . Если вы хотите обмениваться структурами данных между различными частями вашей программы, тогда Twisted - это отличная библиотека, позволяющая вам самим управлять этим, и я бы порекомендовал ее, если вы заботитесь о блокировке и ожидаете много неприятностей. Плохая новость с Twisted заключается в том, что вы должны переписать свой код, чтобы избежать каких-либо блокировок, и есть хорошая кривая обучения.

Вы можете использовать потоки, чтобы избежать блокировки, но я бы расценил это как последнее средство, так как это подвергает вас целому миру боли. Прочитайте хорошую книгу по параллелизму, прежде чем даже подумать об использовании потоков в производстве, например, «Параллельные системы» Жана Бэкона. Я работаю с группой людей, которые действительно крутят высокопроизводительные вещи с помощью потоков, и мы не внедряем потоки в проекты, если они нам действительно не нужны.

5
14.12.2008 17:13:12

Единственный «безопасный» способ сделать это на любом языке - это использовать вторичный процесс для выполнения этого тайм-аута, в противном случае вам нужно создать свой код таким образом, чтобы он сам по себе был безопасным, например, путем проверка времени, прошедшего в цикле или аналогичном. Если изменение метода не вариант, потока не будет достаточно.

Почему? Потому что вы рискуете оставить вещи в плохом состоянии, когда вы это делаете. Если поток просто прерван в середине метода, удерживаемые блокировки и т. Д. Будут просто удерживаться и не могут быть освобождены.

Так что смотрите на путь процесса, не смотрите на путь потока.

4
14.12.2008 17:20:18

Вот функция тайм-аута, я думаю, я нашел через Google, и она работает для меня.

От: http://code.activestate.com/recipes/473878/

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    '''This function will spwan a thread and run the given function using the args, kwargs and 
    return the given default value if the timeout_duration is exceeded 
    ''' 
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            try:
                self.result = func(*args, **kwargs)
            except:
                self.result = default
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return it.result
    else:
        return it.result   
1
15.12.2008 06:25:46
Почему обе части блока возврата if / else возвращают it.result?
NorthIsUp 13.09.2010 20:58:38
Это было мое редактирование для конкретной проблемы, с которой я имел дело. Я не помню почему. В любом случае оригинал вернул «default», если это it.isAlive ().
monkut 16.09.2010 01:22:16

Улучшение в ответе @ rik.the.vik будет состоять в том, чтобы использовать withоператор, чтобы дать функции тайм-аута некоторый синтаксический сахар:

import signal
from contextlib import contextmanager

class TimeoutException(Exception): pass

@contextmanager
def time_limit(seconds):
    def signal_handler(signum, frame):
        raise TimeoutException("Timed out!")
    signal.signal(signal.SIGALRM, signal_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)


try:
    with time_limit(10):
        long_function_call()
except TimeoutException as e:
    print("Timed out!")
90
14.09.2017 21:32:07
try: yield \n finally: signal.alarm(0)
jfs 19.10.2009 17:09:55
Чтобы быть очень суетливым, относится ли сигнал.сигнала (секунды) к попытке?
Casebash 28.10.2009 11:00:31
Ну, это не задокументировано как поднятие каких-либо исключений, так что, вероятно, нет.
Josh Lee 28.10.2009 13:21:09
вместо последнего «Тайм-аут!» Вы, вероятно, намеревались написать сообщение
Olga 21.12.2013 15:43:32
Здравствуйте! Я попытался реализовать этот метод в Python 3.4, и столкнулся с ошибкой, с которой мне трудно справиться: AttributeError: exit Функция обратного отслеживания предоставляет только это: Файл "<stdin>", строка 1, в <module>. исправить эту проблему?
David Scott 5.07.2016 15:29:45

Делать это из обработчика сигнала опасно: вы можете находиться внутри обработчика исключений в момент возникновения исключения и оставлять вещи в неисправном состоянии. Например,

def function_with_enforced_timeout():
  f = open_temporary_file()
  try:
   ...
  finally:
   here()
   unlink(f.filename)

Если ваше исключение возникает здесь (), временный файл никогда не будет удален.

Решение здесь заключается в том, чтобы асинхронные исключения откладывались до тех пор, пока код не окажется внутри кода обработки исключений (исключение или окончательный блок), но Python этого не делает.

Обратите внимание, что это ничего не прерывает при выполнении нативного кода; он прервет его только когда функция вернется, так что это может не помочь в данном конкретном случае. (SIGALRM сам может прервать блокирующий вызов, но код сокета обычно просто повторяется после EINTR.)

Делать это с потоками - лучшая идея, поскольку она более переносима, чем сигналы. Поскольку вы запускаете рабочий поток и блокируете его до конца, нет никаких обычных проблем с параллелизмом. К сожалению, нет способа доставить исключение асинхронно другому потоку в Python (другие API потока могут сделать это). У него также будет та же проблема с отправкой исключения во время обработчика исключения, и потребуется такое же исправление.

7
11.07.2009 20:30:32
нативный код может вызываться PyErr_CheckSignals()перед перезапуском в EINTR, чтобы позволить обработчику сигналов Python работать.
jfs 22.02.2016 11:44:42

Вот способ Linux / OSX ограничить время выполнения функции. Это в том случае, если вы не хотите использовать потоки и хотите, чтобы ваша программа ожидала завершения функции или истечения срока.

from multiprocessing import Process
from time import sleep

def f(time):
    sleep(time)


def run_with_limited_time(func, args, kwargs, time):
    """Runs a function with time limit

    :param func: The function to run
    :param args: The functions args, given as tuple
    :param kwargs: The functions keywords, given as dict
    :param time: The time limit in seconds
    :return: True if the function ended successfully. False if it was terminated.
    """
    p = Process(target=func, args=args, kwargs=kwargs)
    p.start()
    p.join(time)
    if p.is_alive():
        p.terminate()
        return False

    return True


if __name__ == '__main__':
    print run_with_limited_time(f, (1.5, ), {}, 2.5) # True
    print run_with_limited_time(f, (3.5, ), {}, 2.5) # False
17
4.09.2016 09:54:34
Прекрасно работает на моем ноутбуке с Linux, не работает на Windows и почти не работает на OSX. Не твоя вина, просто чудесный мир программирования.
Ameo 20.05.2016 19:04:51
@CaseyPrimozic Вы порождаете новые процессы, это может быть причиной. Я
Ariel Cabib 4.09.2016 09:52:51

Я обычно предпочитаю использовать контекстный менеджер, как предложено @ josh-lee

Но в случае, если кто-то заинтересован, чтобы это было реализовано в качестве декоратора, вот альтернатива.

Вот как это будет выглядеть:

import time
from timeout import timeout

class Test(object):
    @timeout(2)
    def test_a(self, foo, bar):
        print foo
        time.sleep(1)
        print bar
        return 'A Done'

    @timeout(2)
    def test_b(self, foo, bar):
        print foo
        time.sleep(3)
        print bar
        return 'B Done'

t = Test()
print t.test_a('python', 'rocks')
print t.test_b('timing', 'out')

И это timeout.pyмодуль:

import threading

class TimeoutError(Exception):
    pass

class InterruptableThread(threading.Thread):
    def __init__(self, func, *args, **kwargs):
        threading.Thread.__init__(self)
        self._func = func
        self._args = args
        self._kwargs = kwargs
        self._result = None

    def run(self):
        self._result = self._func(*self._args, **self._kwargs)

    @property
    def result(self):
        return self._result


class timeout(object):
    def __init__(self, sec):
        self._sec = sec

    def __call__(self, f):
        def wrapped_f(*args, **kwargs):
            it = InterruptableThread(f, *args, **kwargs)
            it.start()
            it.join(self._sec)
            if not it.is_alive():
                return it.result
            raise TimeoutError('execution expired')
        return wrapped_f

Выход:

python
rocks
A Done
timing
Traceback (most recent call last):
  ...
timeout.TimeoutError: execution expired
out

Обратите внимание, что даже если TimeoutErrorвыброшен, декорированный метод будет продолжать работать в другом потоке. Если вы также хотите, чтобы этот поток был "остановлен", посмотрите: есть ли способ убить поток в Python?

1
23.05.2017 11:46:56
1 - класс InterruptableThread без метода прерывания. 2 - даже после истечения срока выполнения поток все еще выполняет функцию
Sergey11g 23.10.2017 12:32:47

Я предпочитаю подход контекстного менеджера, потому что он позволяет выполнять несколько операторов Python внутри with time_limitоператора. Поскольку система Windows не имеет SIGALARM, более портативный и, возможно, более простой метод может использоватьTimer

from contextlib import contextmanager
import threading
import _thread

class TimeoutException(Exception):
    def __init__(self, msg=''):
        self.msg = msg

@contextmanager
def time_limit(seconds, msg=''):
    timer = threading.Timer(seconds, lambda: _thread.interrupt_main())
    timer.start()
    try:
        yield
    except KeyboardInterrupt:
        raise TimeoutException("Timed out for operation {}".format(msg))
    finally:
        # if the action ends in specified time, timer is canceled
        timer.cancel()

import time
# ends after 5 seconds
with time_limit(5, 'sleep'):
    for i in range(10):
        time.sleep(1)

# this will actually end after 10 seconds
with time_limit(5, 'sleep'):
    time.sleep(10)

Основным приемом здесь является использование _thread.interrupt_mainпрерывания основного потока из потока таймера. Одно предостережение заключается в том, что основной поток не всегда быстро реагирует на KeyboardInterruptподнятые Timer. Например, time.sleep()вызывает системную функцию, поэтому KeyboardInterruptпосле sleepвызова обрабатывается a .

13
6.06.2016 01:52:57
Это кажется самым чистым. Есть ли способ отличить прерывание от прерывания клавиатуры, отправленного пользователем?
user6798019 13.07.2017 22:17:00
Кроме того, это убьет все потоки и не подходит как таймер для уничтожения свободных потоков в многопоточной операции
Alexander McFarlane 18.01.2018 16:15:35
Поскольку threadмодуль был переименован _threadв Python 3, для совместимости с Python 2 вам нужно будет сделать: import sys if sys.version_info[0] < 3: from thread import interrupt_main else: from _thread import interrupt_mainа затем позже:timer = threading.Timer(seconds, lambda: interrupt_main())
Roland Pihlakas 19.05.2018 18:03:50