Торговый робот с приоритетами обработки потоков (заготовка)

Материал из ТХАБ.РФ
Перейти к: навигация, поиск

Описание

При использовании событийной модели Lua скрипт выполняется в двух потоках: функции обратного вызова выполняются в основном потоке РМ QUIK, а функция main() в дополнительном потоке РМ QUIK (подробнее см. п. 1). При этом для предотвращения «подвисаний» РМ QUIK необходимо каким-либо образом оптимизировать сценарии, описанные в функциях обратного вызова. Одним из способов такой оптимизации является перенос логики обработки полученных сигналов в функцию main(). Данный подход сводит количество сценариев в функции обратного вызова до одного, а именно добавление в глобальную Lua таблицу (очередь) записи о том, что функция сработала и вернула определённые значения. Таким образом, мы получаем очередь событий, которые необходимо обработать в другом потоке.

Пример реализации очереди FIFO («первым пришёл — первым ушёл») для обработки срабатывания функций обратного вызова в функции main():

В данном примере используются потокобезопасные функции работы с таблицами Lua sinsert() и sremove(), которые предоставляет плагин QLua. Формат вызова потокобезопасных функций совпадает с форматом вызова стандартных функций Lua insert() и remove(), но при выполнении потокобезопасной функции блокируется выполнение кода в другом потоке до окончания работы вызываемой функции. Таким образом, исключаются ситуации коллизий, когда два потока одновременно вносят изменения в одну и ту же таблицу Lua, что может приводить к неопределённым ситуациям. Из примера видно, что во время инициализации дополнительного потока РМ QUIK объявляется глобальная Lua таблица MAIN_QUEUE (имя переменной может быть любым), в которую и будут записываться последовательно все вызовы событий при срабатывании функций обратного вызова. В дополнительном потоке в функции main() отслеживается размер таблицы MAIN_QUEUE, и, если таблица не пуста, выполняется обработка очереди событий. После обработки события запись из очереди удаляется, при этом во время обработки записи с номером «1» в конец очереди беспрепятственно могут добавляться новые записи при срабатывании функций обратного вызова. После удаления записи с номером «1», все нижестоящие записи сдвигаются на 1 шаг, и запись с номером «2» становится записью с номером «1», и таким образом выполняется обработка всей очереди. По окончании обработки всей очереди скрипт продолжает работать и ожидает новых событий в очереди. Схематично очередь FIFO, описанную в примере, можно представить следующим образом:

Таким образом, используя данный подход, мы разгружаем основной поток РМ QUIK и переносим все сложные алгоритмы обработки событий РМ в дополнительный поток. При этом ни одно событие не будет потеряно и будет обработано в порядке общей очереди. Если необходимо создавать очередь, отфильтровывая только определенные события, например изменение котировок только по определённому инструменту или классу, то в обработчике события можно добавить условие, которое будет следить, что в очередь попадают только необходимые события, а ненужные пропускаются. При необходимости выделения приоритетных событий нужно выделить для них отдельную очередь и первостепенно выполнять ее обработку, а после – все остальные. Пример очереди FIFO с выделением приоритета событию получения/изменения сделки и фильтрации изменения котировок только по определенному классу:

В данном примере мы добавили новую Lua таблицу MAIN_QUEUE_TRADES, которая предназначена для создания очереди события добавления/изменения сделки, а в функции main() выделили приоритет для данной очереди, добавив условие обработки общей очереди MAIN_QUEUE только в том случае, если очередь сделок пуста. Также добавили для события OnQuote() фильтрацию по классу инструментов, и таким образом мы уменьшили очередь MAIN_QUEUE, не добавляя в неё ненужные нам события.

Код Торгового робота

--Обработка событий РМ QUIK в функции main() посредством очереди FIFO
--с выделением приоритета OnTrade и фильтрации OnQuote
 -- Функции вызываемые в основном потоке (тормозят QUIK)
function OnInit(script) -- инициализация переменных скрипта
  is_run = true
  NORMAL_JOB = {} -- таблица очередь обработки событий обычного приоритета
  PRIORITET_JOB = {} -- таблица очередь обработки событий высокого приоритета
end
-- Функции обработки событий - Основной поток
function OnOrder(order) -- Событие  заказ
  table.sinsert(NORMAL_JOB, {callback = "OnOrder", value = order})
end
function OnTrade(trade) -- событие продажа
 table.sinsert(PRIORITET_JOB, trade)
end
function OnAllTrade(all_trade) 
  table.sinsert(NORMAL_JOB, {callback = "OnAllTrade", value = all_trade})
end
function OnQuote(class_code, sec_code)
  if class_code == "SPBFUT" then
  local quote = getQuoteLevel2(class_code, sec_code)
  table.sinsert(NORMAL_JOB, {callback = "OnQuote", value = quote})
 end
end
function OnStop() -- Событие - нажата кнопка остановки скрипта 
  is_run = false
  return 2000
end
-- Дополнительный поток
function main()
 while is_run do -- основной цикл Дополнительного потока
  if #NORMAL_JOB > 0 and #PRIORITET_JOB == 0 then -- обработка событий обычного приоритета
   ProcessingCallbakc(NORMAL_JOB[1])  -- вызов функции обработки событий обычного приоритета
   table.sremove(NORMAL_JOB, 1) -- удаление обработанного события из таблицы (удаляется строка)
   message("Размер общей очереди " .. tostring(#NORMAL_JOB)) -- преобразует в строку размер таблицы NORMAL_JOB - список НЕ приоритетных событий
   elseif #PRIORITET_JOB > 0 then -- обработка событий Высокого приоритета
     ProcessingOnTrade(PRIORITET_JOB[1]) -- вызов функции обработки событий высокого приоритета
   table.sremove(PRIORITET_JOB, 1) -- удаление обработанного события из таблицы (удаляется строка)
   message("Размер очереди сделок " .. tostring(#PRIORITET_JOB)) -- преобразует в строку размер таблицы NORMAL_JOB - список приоритетных событий
  end
 end
end
-- Функции вызываемые в дополнительном потоке (не тормозят QUIK)
function ProcessingCallbakc(value) -- функция обработки событий обычного приоритета
  message(string.format("Обработка события %s начата", value.callback))
  sleep(3000) --эмуляция продолжительного алгоритма обработки события
  message(string.format("Обработка события %s завершена", value.callback))
end

function ProcessingOnTrade(trade) -- функция обработки событий высокого приоритета
  message(string.format("Обработка сделки №%s начата", trade.trade_num))
  sleep(3000) --эмуляция продолжительного алгоритма обработки сделки
  message(string.format("Обработка сделки №%s завершена", trade.trade_num))
end

См. также

Ссылки