Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
Communications Backbone System
communications-backbone
Commits
46238f57
Commit
46238f57
authored
1 year ago
by
James Kirk
Browse files
Options
Download
Email Patches
Plain Diff
refactor: black linting
parent
48a3eeff
dev
11-add-websockets
master
v1.0.0
2 merge requests
!26
Resolve "Release v1.0.0"
,
!25
refactor: changed to an async connection method, sharing a channel as much as we can
Pipeline
#134588
passed with stages
in 2 minutes and 5 seconds
Changes
2
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
27 additions
and
27 deletions
+27
-27
rmq.py
rmq.py
+7
-2
soar_bus.py
soar_bus.py
+20
-25
No files found.
rmq.py
View file @
46238f57
...
...
@@ -11,6 +11,7 @@ host = os.getenv(
# -------------------------------------------------------------------------------------------------------------------------------------------------------------
def
setup_queue
(
channel
,
queue_name
=
""
):
channel
.
queue_declare
(
queue
=
queue_name
,
exclusive
=
False
,
durable
=
True
...
...
@@ -130,7 +131,9 @@ def broadcast(channel, queue_name, exchange_name):
ch
.
basic_ack
(
delivery_tag
=
method
.
delivery_tag
)
try
:
return
channel
.
basic_consume
(
queue
=
queue_name
,
on_message_callback
=
broadcast_callback
)
return
channel
.
basic_consume
(
queue
=
queue_name
,
on_message_callback
=
broadcast_callback
)
except
pika
.
exceptions
.
AMQPChannelError
as
err
:
print
(
"Caught a channel error: {}, stopping..."
.
format
(
err
))
...
...
@@ -154,7 +157,9 @@ def forward(channel, from_queue, to_queue):
ch
.
basic_ack
(
delivery_tag
=
method
.
delivery_tag
)
try
:
return
channel
.
basic_consume
(
queue
=
from_queue
,
on_message_callback
=
forward_callback
)
return
channel
.
basic_consume
(
queue
=
from_queue
,
on_message_callback
=
forward_callback
)
except
pika
.
exceptions
.
AMQPChannelError
as
err
:
logging
.
error
(
"Caught a channel error: {}, stopping..."
.
format
(
err
))
...
...
This diff is collapsed.
Click to expand it.
soar_bus.py
View file @
46238f57
...
...
@@ -31,6 +31,7 @@ EXCHANGES = {
"broadcast"
:
"soar_broadcast"
,
}
class
ConfigHandler
(
FileSystemEventHandler
):
def
__init__
(
self
,
channel
):
self
.
client_model
=
ClientModel
()
...
...
@@ -70,8 +71,14 @@ class ConfigHandler(FileSystemEventHandler):
self
.
channel
.
basic_cancel
(
client_tags
[
f
"
{
client_id
}
-broadcast"
])
self
.
channel
.
basic_cancel
(
client_tags
[
f
"
{
client_id
}
-outbox"
])
self
.
channel
.
queue_unbind
(
queue
=
f
"
{
client_id
}
-inbox"
,
exchange
=
client_tags
[
f
"
{
client_id
}
-inbox-publish"
])
self
.
channel
.
queue_unbind
(
queue
=
f
"
{
client_id
}
-inbox"
,
exchange
=
client_tags
[
f
"
{
client_id
}
-inbox-broadcast"
])
self
.
channel
.
queue_unbind
(
queue
=
f
"
{
client_id
}
-inbox"
,
exchange
=
client_tags
[
f
"
{
client_id
}
-inbox-publish"
],
)
self
.
channel
.
queue_unbind
(
queue
=
f
"
{
client_id
}
-inbox"
,
exchange
=
client_tags
[
f
"
{
client_id
}
-inbox-broadcast"
],
)
if
client_id
in
RUNNING_CLIENTS
:
RUNNING_CLIENTS
.
remove
(
client_id
)
...
...
@@ -88,7 +95,7 @@ class WatchConfigThread(threading.Thread):
self
.
running_clients
=
running_clients
self
.
channel
=
channel
self
.
start
()
def
run
(
self
):
logging
.
info
(
"Starting config watcher..."
)
event_handler
=
ConfigHandler
(
self
.
channel
)
...
...
@@ -115,11 +122,7 @@ class SoarBusThread(threading.Thread):
def
run
(
self
):
logging
.
info
(
"Starting SOAR bus..."
)
publish
(
self
.
channel
,
"soar-publish"
,
EXCHANGES
.
get
(
"publish"
)
)
publish
(
self
.
channel
,
"soar-publish"
,
EXCHANGES
.
get
(
"publish"
))
for
id
in
self
.
clients
.
keys
():
run_client
(
id
,
self
.
channel
)
...
...
@@ -135,15 +138,9 @@ def run_client(client_id, channel):
client_id
=
client
[
"client_id"
]
logging
.
info
(
f
"Running client:
{
client_id
}
"
)
forward_consumer_tag
=
forward
(
channel
,
f
"
{
client_id
}
-outbox"
,
"soar-publish"
)
forward_consumer_tag
=
forward
(
channel
,
f
"
{
client_id
}
-outbox"
,
"soar-publish"
)
broadcast_consumer_tag
=
broadcast
(
channel
,
f
"
{
client_id
}
-broadcast"
,
EXCHANGES
.
get
(
"broadcast"
)
channel
,
f
"
{
client_id
}
-broadcast"
,
EXCHANGES
.
get
(
"broadcast"
)
)
subscribe
(
channel
,
...
...
@@ -151,11 +148,7 @@ def run_client(client_id, channel):
EXCHANGES
.
get
(
"publish"
),
client
[
"subscription"
],
)
subscribe
(
channel
,
f
"
{
client_id
}
-inbox"
,
EXCHANGES
.
get
(
"broadcast"
)
)
subscribe
(
channel
,
f
"
{
client_id
}
-inbox"
,
EXCHANGES
.
get
(
"broadcast"
))
CONSUMER_TAGS
[
client_id
]
=
{
f
"
{
client_id
}
-broadcast"
:
broadcast_consumer_tag
,
...
...
@@ -211,14 +204,16 @@ if __name__ == "__main__":
pingcounter
+=
1
s
.
close
()
host
=
os
.
getenv
(
"MQ_HOST"
,
"localhost"
host
=
os
.
getenv
(
"MQ_HOST"
,
"localhost"
)
connection
=
pika
.
SelectConnection
(
pika
.
ConnectionParameters
(
host
),
on_open_callback
=
on_connection_open
,
on_close_callback
=
on_connection_close
,
)
connection
=
pika
.
SelectConnection
(
pika
.
ConnectionParameters
(
host
),
on_open_callback
=
on_connection_open
,
on_close_callback
=
on_connection_close
)
try
:
connection
.
ioloop
.
start
()
except
KeyboardInterrupt
:
# Loop until fully closed
connection
.
close
()
connection
.
ioloop
.
start
()
\ No newline at end of file
connection
.
ioloop
.
start
()
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment