Skip to content

Commit 6c9474d

Browse files
fix(PubSub,Subscribe): return when server sends a response
1 parent 8467e65 commit 6c9474d

File tree

1 file changed

+32
-23
lines changed

1 file changed

+32
-23
lines changed

src/CoreApi/PubSubApi.cs

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -117,43 +117,52 @@ internal PubSubApi(IpfsClient ipfs)
117117
public async Task Subscribe(string topic, Action<PublishedMessage> handler, CancellationToken cancellationToken)
118118
{
119119
var messageStream = await ipfs.PostDownloadAsync("pubsub/sub", cancellationToken, topic);
120+
var sr = new StreamReader(messageStream);
121+
var response = sr.ReadLine();
122+
if (log.IsDebugEnabled)
123+
log.Debug("RSP " + response);
124+
125+
// First line is always an empty JSON object,
120126
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
121-
Task.Run(() => ProcessMessages(topic, handler, messageStream, cancellationToken));
127+
Task.Run(() => ProcessMessages(topic, handler, sr, cancellationToken));
122128
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
129+
123130
return;
124131
}
125132

126-
void ProcessMessages(string topic, Action<PublishedMessage> handler, Stream stream, CancellationToken ct)
133+
void ProcessMessages(string topic, Action<PublishedMessage> handler, StreamReader sr, CancellationToken ct)
127134
{
128135
log.DebugFormat("Start listening for '{0}' messages", topic);
129136

130-
using (var sr = new StreamReader(stream))
137+
// .Net needs a ReadLine(CancellationToken)
138+
// As a work-around, we register a function to close the stream
139+
ct.Register(() => sr.Dispose());
140+
try
131141
{
132-
// .Net needs a ReadLine(CancellationToken)
133-
// As a work-around, we register a function to close the stream
134-
ct.Register(() => sr.Dispose());
135-
try
142+
while (!sr.EndOfStream && !ct.IsCancellationRequested)
136143
{
137-
while (!sr.EndOfStream && !ct.IsCancellationRequested)
144+
var json = sr.ReadLine();
145+
if (json == null)
146+
break;
147+
if (log.IsDebugEnabled)
148+
log.DebugFormat("PubSub message {0}", json);
149+
if (json != "{}" && !ct.IsCancellationRequested)
138150
{
139-
var json = sr.ReadLine();
140-
if (json == null)
141-
break;
142-
if (log.IsDebugEnabled)
143-
log.DebugFormat("PubSub message {0}", json);
144-
if (json != "{}" && !ct.IsCancellationRequested)
145-
{
146-
handler(new PublishedMessage(json));
147-
}
151+
handler(new PublishedMessage(json));
148152
}
149153
}
150-
catch (Exception e)
151-
{
152-
// Do not report errors when cancelled.
153-
if (!ct.IsCancellationRequested)
154-
log.Error(e);
155-
}
156154
}
155+
catch (Exception e)
156+
{
157+
// Do not report errors when cancelled.
158+
if (!ct.IsCancellationRequested)
159+
log.Error(e);
160+
}
161+
finally
162+
{
163+
sr.Dispose();
164+
}
165+
157166
log.DebugFormat("Stop listening for '{0}' messages", topic);
158167
}
159168

0 commit comments

Comments
 (0)