Торговый робот с приоритетами обработки потоков (заготовка) — различия между версиями
СмартЛаб (обсуждение | вклад) м |
СмартЛаб (обсуждение | вклад) м (→Код Торгового робота) |
||
(не показаны 4 промежуточные версии этого же участника) | |||
Строка 1: | Строка 1: | ||
== Описание == | == Описание == | ||
+ | При использовании событийной модели Lua скрипт выполняется в двух потоках: функции обратного | ||
+ | вызова выполняются в основном потоке РМ QUIK, а функция main() в дополнительном потоке РМ QUIK | ||
+ | (подробнее см. п. 1). При этом для предотвращения «подвисаний» РМ QUIK необходимо каким-либо | ||
+ | образом оптимизировать сценарии, описанные в функциях обратного вызова. Одним из способов такой | ||
+ | оптимизации является перенос логики обработки полученных сигналов в функцию main(). Данный | ||
+ | подход сводит количество сценариев в функции обратного вызова до одного, а именно добавление в | ||
+ | глобальную Lua таблицу (очередь) записи о том, что функция сработала и вернула определённые | ||
+ | значения. Таким образом, мы получаем очередь событий, которые необходимо обработать в другом | ||
+ | потоке. | ||
+ | |||
+ | Пример реализации очереди FIFO («первым пришёл — первым ушёл») для обработки срабатывания функций обратного вызова в функции main(): | ||
+ | |||
+ | В данном примере используются потокобезопасные функции работы с таблицами Lua sinsert() | ||
+ | и sremove(), которые предоставляет плагин QLua. Формат вызова потокобезопасных функций совпадает | ||
+ | с форматом вызова стандартных функций Lua insert() и remove(), но при выполнении потокобезопасной | ||
+ | функции блокируется выполнение кода в другом потоке до окончания работы вызываемой функции. | ||
+ | Таким образом, исключаются ситуации коллизий, когда два потока одновременно вносят изменения в | ||
+ | одну и ту же таблицу Lua, что может приводить к неопределённым ситуациям. | ||
+ | Из примера видно, что во время инициализации дополнительного потока РМ QUIK объявляется | ||
+ | глобальная Lua таблица MAIN_QUEUE (имя переменной может быть любым), в которую и будут | ||
+ | записываться последовательно все вызовы событий при срабатывании функций обратного вызова. В | ||
+ | дополнительном потоке в функции main() отслеживается размер таблицы MAIN_QUEUE, и, если таблица | ||
+ | не пуста, выполняется обработка очереди событий. После обработки события запись из очереди | ||
+ | удаляется, при этом во время обработки записи с номером «1» в конец очереди беспрепятственно могут | ||
+ | добавляться новые записи при срабатывании функций обратного вызова. После удаления записи с | ||
+ | номером «1», все нижестоящие записи сдвигаются на 1 шаг, и запись с номером «2» становится записью | ||
+ | с номером «1», и таким образом выполняется обработка всей очереди. По окончании обработки всей | ||
+ | очереди скрипт продолжает работать и ожидает новых событий в очереди. Схематично очередь FIFO, | ||
+ | описанную в примере, можно представить следующим образом: | ||
+ | |||
+ | Таким образом, используя данный подход, мы разгружаем основной поток РМ QUIK и переносим все сложные алгоритмы обработки событий РМ в дополнительный поток. При этом ни одно событие не будет | ||
+ | потеряно и будет обработано в порядке общей очереди. Если необходимо создавать очередь, отфильтровывая только определенные события, например изменение котировок только по | ||
+ | определённому инструменту или классу, то в обработчике события можно добавить условие, которое будет следить, что в очередь попадают только необходимые события, а ненужные пропускаются. При | ||
+ | необходимости выделения приоритетных событий нужно выделить для них отдельную очередь и первостепенно выполнять ее обработку, а после – все остальные. | ||
+ | Пример очереди FIFO с выделением приоритета событию получения/изменения сделки и фильтрации изменения котировок только по определенному классу: | ||
+ | |||
В данном примере мы добавили новую Lua таблицу MAIN_QUEUE_TRADES, которая предназначена для создания очереди события добавления/изменения сделки, а в функции main() выделили приоритет для данной очереди, добавив условие обработки общей очереди MAIN_QUEUE только в том случае, если очередь сделок пуста. Также добавили для события OnQuote() фильтрацию по классу инструментов, и таким образом мы уменьшили очередь MAIN_QUEUE, не добавляя в неё ненужные нам события. | В данном примере мы добавили новую Lua таблицу MAIN_QUEUE_TRADES, которая предназначена для создания очереди события добавления/изменения сделки, а в функции main() выделили приоритет для данной очереди, добавив условие обработки общей очереди MAIN_QUEUE только в том случае, если очередь сделок пуста. Также добавили для события OnQuote() фильтрацию по классу инструментов, и таким образом мы уменьшили очередь MAIN_QUEUE, не добавляя в неё ненужные нам события. | ||
Строка 5: | Строка 41: | ||
--Обработка событий РМ QUIK в функции main() посредством очереди FIFO | --Обработка событий РМ QUIK в функции main() посредством очереди FIFO | ||
--с выделением приоритета OnTrade и фильтрации OnQuote | --с выделением приоритета OnTrade и фильтрации OnQuote | ||
− | function OnInit(script) | + | -- Функции вызываемые в основном потоке (тормозят QUIK) |
+ | function OnInit(script) -- инициализация переменных скрипта | ||
is_run = true | is_run = true | ||
− | + | NORMAL_JOB = {} -- таблица очередь обработки событий обычного приоритета | |
− | + | PRIORITET_JOB = {} -- таблица очередь обработки событий высокого приоритета | |
end | end | ||
− | function OnOrder(order) | + | |
− | table.sinsert( | + | -- Функции обработки событий - Основной поток |
+ | function OnOrder(order) -- Событие заказ | ||
+ | table.sinsert(NORMAL_JOB, {callback = "OnOrder", value = order}) | ||
end | end | ||
− | function OnTrade(trade) | + | function OnTrade(trade) -- событие продажа |
− | table.sinsert( | + | table.sinsert(PRIORITET_JOB, trade) |
end | end | ||
− | + | function OnAllTrade(all_trade) | |
− | function OnAllTrade(all_trade) | + | table.sinsert(NORMAL_JOB, {callback = "OnAllTrade", value = all_trade}) |
− | table.sinsert( | ||
end | end | ||
function OnQuote(class_code, sec_code) | function OnQuote(class_code, sec_code) | ||
if class_code == "SPBFUT" then | if class_code == "SPBFUT" then | ||
local quote = getQuoteLevel2(class_code, sec_code) | local quote = getQuoteLevel2(class_code, sec_code) | ||
− | table.sinsert( | + | table.sinsert(NORMAL_JOB, {callback = "OnQuote", value = quote}) |
end | end | ||
end | end | ||
− | function OnStop() | + | |
+ | function OnStop() -- Событие - нажата кнопка остановки скрипта | ||
is_run = false | is_run = false | ||
return 2000 | return 2000 | ||
end | end | ||
+ | |||
+ | -- Дополнительный поток | ||
function main() | function main() | ||
− | while is_run do | + | while is_run do -- основной цикл Дополнительного потока |
− | if # | + | if #NORMAL_JOB > 0 and #PRIORITET_JOB == 0 then -- обработка событий обычного приоритета |
− | ProcessingCallbakc( | + | ProcessingCallbakc(NORMAL_JOB[1]) -- вызов функции обработки событий обычного приоритета |
− | table.sremove( | + | table.sremove(NORMAL_JOB, 1) -- удаление обработанного события из таблицы (удаляется строка) |
− | message("Размер общей очереди " .. tostring(# | + | message("Размер общей очереди " .. tostring(#NORMAL_JOB)) -- преобразует в строку размер таблицы NORMAL_JOB - список НЕ приоритетных событий |
− | elseif # | + | elseif #PRIORITET_JOB > 0 then -- обработка событий Высокого приоритета |
− | + | ProcessingOnTrade(PRIORITET_JOB[1]) -- вызов функции обработки событий высокого приоритета | |
− | table.sremove( | + | table.sremove(PRIORITET_JOB, 1) -- удаление обработанного события из таблицы (удаляется строка) |
− | message("Размер очереди сделок " .. tostring(# | + | message("Размер очереди сделок " .. tostring(#PRIORITET_JOB)) -- преобразует в строку размер таблицы NORMAL_JOB - список приоритетных событий |
− | end | + | end -- конец IF |
− | end | + | end -- конец While |
− | end | + | end -- конец Mine() |
− | + | -- Функции вызываемые в дополнительном потоке (не тормозят QUIK) | |
− | function ProcessingCallbakc(value) | + | function ProcessingCallbakc(value) -- функция обработки событий обычного приоритета |
message(string.format("Обработка события %s начата", value.callback)) | message(string.format("Обработка события %s начата", value.callback)) | ||
sleep(3000) --эмуляция продолжительного алгоритма обработки события | sleep(3000) --эмуляция продолжительного алгоритма обработки события | ||
Строка 50: | Строка 91: | ||
end | end | ||
− | function ProcessingOnTrade(trade) | + | function ProcessingOnTrade(trade) -- функция обработки событий высокого приоритета |
message(string.format("Обработка сделки №%s начата", trade.trade_num)) | message(string.format("Обработка сделки №%s начата", trade.trade_num)) | ||
sleep(3000) --эмуляция продолжительного алгоритма обработки сделки | sleep(3000) --эмуляция продолжительного алгоритма обработки сделки | ||
message(string.format("Обработка сделки №%s завершена", trade.trade_num)) | message(string.format("Обработка сделки №%s завершена", trade.trade_num)) | ||
end | end | ||
− | |||
− | |||
== См. также == | == См. также == |
Текущая версия на 23:41, 31 марта 2020
Содержание
Описание
При использовании событийной модели Lua скрипт выполняется в двух потоках: функции обратного вызова выполняются в основном потоке РМ QUIK, а функция main() в дополнительном потоке РМ QUIK (подробнее см. п. 1). При этом для предотвращения «подвисаний» РМ QUIK необходимо каким-либо образом оптимизировать сценарии, описанные в функциях обратного вызова. Одним из способов такой оптимизации является перенос логики обработки полученных сигналов в функцию main(). Данный подход сводит количество сценариев в функции обратного вызова до одного, а именно добавление в глобальную Lua таблицу (очередь) записи о том, что функция сработала и вернула определённые значения. Таким образом, мы получаем очередь событий, которые необходимо обработать в другом потоке.
Пример реализации очереди FIFO («первым пришёл — первым ушёл») для обработки срабатывания функций обратного вызова в функции main():
В данном примере используются потокобезопасные функции работы с таблицами Lua sinsert() и sremove(), которые предоставляет плагин QLua. Формат вызова потокобезопасных функций совпадает с форматом вызова стандартных функций Lua insert() и remove(), но при выполнении потокобезопасной функции блокируется выполнение кода в другом потоке до окончания работы вызываемой функции. Таким образом, исключаются ситуации коллизий, когда два потока одновременно вносят изменения в одну и ту же таблицу Lua, что может приводить к неопределённым ситуациям. Из примера видно, что во время инициализации дополнительного потока РМ QUIK объявляется глобальная Lua таблица MAIN_QUEUE (имя переменной может быть любым), в которую и будут записываться последовательно все вызовы событий при срабатывании функций обратного вызова. В дополнительном потоке в функции main() отслеживается размер таблицы MAIN_QUEUE, и, если таблица не пуста, выполняется обработка очереди событий. После обработки события запись из очереди удаляется, при этом во время обработки записи с номером «1» в конец очереди беспрепятственно могут добавляться новые записи при срабатывании функций обратного вызова. После удаления записи с номером «1», все нижестоящие записи сдвигаются на 1 шаг, и запись с номером «2» становится записью с номером «1», и таким образом выполняется обработка всей очереди. По окончании обработки всей очереди скрипт продолжает работать и ожидает новых событий в очереди. Схематично очередь FIFO, описанную в примере, можно представить следующим образом:
Таким образом, используя данный подход, мы разгружаем основной поток РМ QUIK и переносим все сложные алгоритмы обработки событий РМ в дополнительный поток. При этом ни одно событие не будет потеряно и будет обработано в порядке общей очереди. Если необходимо создавать очередь, отфильтровывая только определенные события, например изменение котировок только по определённому инструменту или классу, то в обработчике события можно добавить условие, которое будет следить, что в очередь попадают только необходимые события, а ненужные пропускаются. При необходимости выделения приоритетных событий нужно выделить для них отдельную очередь и первостепенно выполнять ее обработку, а после – все остальные. Пример очереди FIFO с выделением приоритета событию получения/изменения сделки и фильтрации изменения котировок только по определенному классу:
В данном примере мы добавили новую Lua таблицу MAIN_QUEUE_TRADES, которая предназначена для создания очереди события добавления/изменения сделки, а в функции main() выделили приоритет для данной очереди, добавив условие обработки общей очереди MAIN_QUEUE только в том случае, если очередь сделок пуста. Также добавили для события OnQuote() фильтрацию по классу инструментов, и таким образом мы уменьшили очередь MAIN_QUEUE, не добавляя в неё ненужные нам события.
Код Торгового робота
--Обработка событий РМ QUIK в функции main() посредством очереди FIFO --с выделением приоритета OnTrade и фильтрации OnQuote -- Функции вызываемые в основном потоке (тормозят QUIK) function OnInit(script) -- инициализация переменных скрипта is_run = true NORMAL_JOB = {} -- таблица очередь обработки событий обычного приоритета PRIORITET_JOB = {} -- таблица очередь обработки событий высокого приоритета end
-- Функции обработки событий - Основной поток function OnOrder(order) -- Событие заказ table.sinsert(NORMAL_JOB, {callback = "OnOrder", value = order}) end function OnTrade(trade) -- событие продажа table.sinsert(PRIORITET_JOB, trade) end function OnAllTrade(all_trade) table.sinsert(NORMAL_JOB, {callback = "OnAllTrade", value = all_trade}) end function OnQuote(class_code, sec_code) if class_code == "SPBFUT" then local quote = getQuoteLevel2(class_code, sec_code) table.sinsert(NORMAL_JOB, {callback = "OnQuote", value = quote}) end end
function OnStop() -- Событие - нажата кнопка остановки скрипта is_run = false return 2000 end
-- Дополнительный поток function main() while is_run do -- основной цикл Дополнительного потока if #NORMAL_JOB > 0 and #PRIORITET_JOB == 0 then -- обработка событий обычного приоритета ProcessingCallbakc(NORMAL_JOB[1]) -- вызов функции обработки событий обычного приоритета table.sremove(NORMAL_JOB, 1) -- удаление обработанного события из таблицы (удаляется строка) message("Размер общей очереди " .. tostring(#NORMAL_JOB)) -- преобразует в строку размер таблицы NORMAL_JOB - список НЕ приоритетных событий elseif #PRIORITET_JOB > 0 then -- обработка событий Высокого приоритета ProcessingOnTrade(PRIORITET_JOB[1]) -- вызов функции обработки событий высокого приоритета table.sremove(PRIORITET_JOB, 1) -- удаление обработанного события из таблицы (удаляется строка) message("Размер очереди сделок " .. tostring(#PRIORITET_JOB)) -- преобразует в строку размер таблицы NORMAL_JOB - список приоритетных событий end -- конец IF end -- конец While end -- конец Mine() -- Функции вызываемые в дополнительном потоке (не тормозят QUIK) function ProcessingCallbakc(value) -- функция обработки событий обычного приоритета message(string.format("Обработка события %s начата", value.callback)) sleep(3000) --эмуляция продолжительного алгоритма обработки события message(string.format("Обработка события %s завершена", value.callback)) end function ProcessingOnTrade(trade) -- функция обработки событий высокого приоритета message(string.format("Обработка сделки №%s начата", trade.trade_num)) sleep(3000) --эмуляция продолжительного алгоритма обработки сделки message(string.format("Обработка сделки №%s завершена", trade.trade_num)) end
См. также
Ссылки
- https://smart-lab.ru/algotrading/Есть%20ли%20в%20свободном%20доступе%20пример%20самого%20простого%20торгового%20робота%20на%20QLua Обсуждение этого торгового робота