From 057634fba8ae8b02874b732644f83ab41b8b0a2e Mon Sep 17 00:00:00 2001 From: Kushagra Date: Thu, 9 Apr 2026 04:52:52 +0530 Subject: [PATCH] fix: added expired_at filter to message pipeline --- src/handlers/subscribe-message-handler.ts | 10 ++- .../subscribe-message-handler.spec.ts | 61 ++++++++++++++++++- 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/src/handlers/subscribe-message-handler.ts b/src/handlers/subscribe-message-handler.ts index 1ab4e322..87fbde95 100644 --- a/src/handlers/subscribe-message-handler.ts +++ b/src/handlers/subscribe-message-handler.ts @@ -4,7 +4,7 @@ import { pipeline } from 'stream/promises' import { createEndOfStoredEventsNoticeMessage, createNoticeMessage, createOutgoingEventMessage } from '../utils/messages' import { IAbortable, IMessageHandler } from '../@types/message-handlers' -import { isEventMatchingFilter, toNostrEvent } from '../utils/event' +import { isEventMatchingFilter, isExpiredEvent, toNostrEvent } from '../utils/event' import { streamEach, streamEnd, streamFilter, streamMap } from '../utils/stream' import { SubscriptionFilter, SubscriptionId } from '../@types/subscription' import { createLogger } from '../factories/logger-factory' @@ -55,6 +55,12 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable { const sendEOSE = () => this.webSocket.emit(WebSocketAdapterEvent.Message, createEndOfStoredEventsNoticeMessage(subscriptionId)) const isSubscribedToEvent = SubscribeMessageHandler.isClientSubscribedToEvent(filters) + const isNotExpired = (event: Event)=>{ + if (isExpiredEvent(event)) { + return false + } + return true + } const findEvents = this.eventRepository.findByFilters(filters).stream() @@ -65,6 +71,7 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable { findEvents, streamFilter(propSatisfies(isNil, 'deleted_at')), streamMap(toNostrEvent), + streamFilter(isNotExpired), streamFilter(isSubscribedToEvent), streamEach(sendEvent), streamEnd(sendEOSE), @@ -117,3 +124,4 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable { } } + diff --git a/test/unit/handlers/subscribe-message-handler.spec.ts b/test/unit/handlers/subscribe-message-handler.spec.ts index adcdbb5a..c269a75e 100644 --- a/test/unit/handlers/subscribe-message-handler.spec.ts +++ b/test/unit/handlers/subscribe-message-handler.spec.ts @@ -3,6 +3,7 @@ import chai from 'chai' import chaiAsPromised from 'chai-as-promised' import EventEmitter from 'events' import Sinon from 'sinon' +import sinonChai from 'sinon-chai' import { IAbortable, IMessageHandler } from '../../../src/@types/message-handlers' import { MessageType, SubscribeMessage } from '../../../src/@types/messages' @@ -14,10 +15,14 @@ import { PassThrough } from 'stream' import { SubscribeMessageHandler } from '../../../src/handlers/subscribe-message-handler' import { WebSocketAdapterEvent } from '../../../src/constants/adapter' +chai.use(sinonChai) chai.use(chaiAsPromised) const { expect } = chai -const toDbEvent = (event: Event) => ({ +const toDbEvent = ( + event: Event, + metadata: { expires_at?: number, deleted_at?: Date | null } = {}, +) => ({ event_id: Buffer.from(event.id, 'hex'), event_kind: event.kind, event_pubkey: Buffer.from(event.pubkey, 'hex'), @@ -25,6 +30,7 @@ const toDbEvent = (event: Event) => ({ event_content: event.content, event_tags: event.tags, event_signature: Buffer.from(event.sig, 'hex'), + ...metadata, }) describe('SubscribeMessageHandler', () => { @@ -112,11 +118,13 @@ describe('SubscribeMessageHandler', () => { describe('#fetchAndSend', () => { let event: Event + let clock: Sinon.SinonFakeTimers let webSocketOnMessageStub: Sinon.SinonStub let webSocketOnSubscribeStub: Sinon.SinonStub let isClientSubscribedToEventStub: Sinon.SinonStub beforeEach(() => { + clock = Sinon.useFakeTimers(1665546189000) event = { 'id': 'b1601d26958e6508b7b9df0af609c652346c09392b6534d93aead9819a51b4ef', 'pubkey': '22e804d26ed16b68db5259e78449e96dab5d464c8f470bda3eb1a70467f2c793', @@ -136,6 +144,10 @@ describe('SubscribeMessageHandler', () => { //streamEndSpy = sandbox.spy(Stream, '_end' as any) }) + afterEach(() => { + clock.restore() + }) + it('does not send event if client is not subscribed to it', async () => { isClientSubscribedToEventStub.returns(always(false)) @@ -165,6 +177,53 @@ describe('SubscribeMessageHandler', () => { ) }) + it('does not send expired events', async () => { + isClientSubscribedToEventStub.returns(always(true)) + + const now = Math.floor(clock.now / 1000) + const promise = (handler as any).fetchAndSend(subscriptionId, filters) + + const expiredEvent: Event = { + ...event, + tags: [['expiration', String(now - 1)] as any], + } + + stream.write(toDbEvent(expiredEvent)) + stream.end() + + await promise + + expect(eventRepositoryFindByFiltersStub).to.have.been.calledOnceWithExactly(filters) + expect(webSocketOnMessageStub).to.have.been.calledOnceWithExactly( + ['EOSE', subscriptionId], + ) + }) + + it('sends event if expiration is in the future', async () => { + isClientSubscribedToEventStub.returns(always(true)) + + const now = Math.floor(clock.now / 1000) + const promise = (handler as any).fetchAndSend(subscriptionId, filters) + + const eventWithFutureExpiration: Event = { + ...event, + tags: [['expiration', String(now + 60)] as any], + } + + stream.write(toDbEvent(eventWithFutureExpiration)) + stream.end() + + await promise + + expect(eventRepositoryFindByFiltersStub).to.have.been.calledOnceWithExactly(filters) + expect(webSocketOnMessageStub).to.have.been.calledWithExactly( + ['EVENT', subscriptionId, eventWithFutureExpiration], + ) + expect(webSocketOnMessageStub).to.have.been.calledWithExactly( + ['EOSE', subscriptionId], + ) + }) + it('sends EOSE', async () => { const promise = (handler as any).fetchAndSend(subscriptionId, filters)