/****************************************************************************** File: BufferStream.h Copyright 1995-97, Be Incorporated ******************************************************************************/ #ifndef _BUFFER_STREAM_H #define _BUFFER_STREAM_H #include #include #include #include #include #include /* ================ Per-subscriber information. ================ */ struct _sbuf_info; typedef struct _sub_info { _sub_info *fNext; /* next subscriber in the stream*/ _sub_info *fPrev; /* previous subscriber in the stream */ _sbuf_info *fRel; /* next buf to be released */ _sbuf_info *fAcq; /* next buf to be acquired */ sem_id fSem; /* semaphore used for blocking */ bigtime_t fTotalTime; /* accumulated time between acq/rel */ int32 fHeld; /* # of buffers acq'd but not yet rel'd */ sem_id fBlockedOn; /* the semaphore being waited on */ /* or B_BAD_SEM_ID if not blocked */ } *subscriber_id; /* ================ Per-buffer information ================ */ typedef struct _sbuf_info { _sbuf_info *fNext; /* next "newer" buffer in the chain */ subscriber_id fAvailTo; /* next subscriber to acquire this buffer */ subscriber_id fHeldBy; /* subscriber that's acquired this buffer */ bigtime_t fAcqTime; /* time at which this buffer was acquired */ area_id fAreaID; /* for system memory allocation calls */ char *fAddress; int32 fSize; /* usable portion can be smaller than ... */ int32 fAreaSize; /* ... the size of the area. */ bool fIsFinal; /* TRUE => stream is stopping */ } *buffer_id; /* ================ Interface definition for BBufferStream class ================ */ /* We've chosen B_MAX_SUBSCRIBER_COUNT and B_MAX_BUFFER_COUNT to be small * enough so that a BBufferStream structure fits in one 4096 byte page. */ #define B_MAX_SUBSCRIBER_COUNT 52 #define B_MAX_BUFFER_COUNT 32 class BBufferStream; class BBufferStreamManager; typedef BBufferStream* stream_id; // for now class BAbstractBufferStream { public: virtual status_t GetStreamParameters(size_t *bufferSize, int32 *bufferCount, bool *isRunning, int32 *subscriberCount) const; virtual status_t SetStreamBuffers(size_t bufferSize, int32 bufferCount); virtual status_t StartStreaming(); virtual status_t StopStreaming(); protected: virtual void _ReservedAbstractBufferStream1(); virtual void _ReservedAbstractBufferStream2(); virtual void _ReservedAbstractBufferStream3(); virtual void _ReservedAbstractBufferStream4(); friend class BSubscriber; friend class BBufferStreamManager; virtual stream_id StreamID() const; /* stream identifier for direct access */ /* Create or delete a subscriber id for subsequent operations */ virtual status_t Subscribe(char *name, subscriber_id *subID, sem_id semID); virtual status_t Unsubscribe(subscriber_id subID); /* Enter into or quit the stream */ virtual status_t EnterStream(subscriber_id subID, subscriber_id neighbor, bool before); virtual status_t ExitStream(subscriber_id subID); virtual BMessenger* Server() const; /* message pipe to server */ status_t SendRPC(BMessage* msg, BMessage* reply = NULL) const; }; class BBufferStream : public BAbstractBufferStream { public: BBufferStream(size_t headerSize, BBufferStreamManager* controller, BSubscriber* headFeeder, BSubscriber* tailFeeder); virtual ~BBufferStream(); /* BBufferStreams are allocated on shared memory pages */ void *operator new(size_t size); void operator delete(void *stream, size_t size); /* Return header size */ size_t HeaderSize() const; /* These four functions are delegated to the stream controller */ status_t GetStreamParameters(size_t *bufferSize, int32 *bufferCount, bool *isRunning, int32 *subscriberCount) const; status_t SetStreamBuffers(size_t bufferSize, int32 bufferCount); status_t StartStreaming(); status_t StopStreaming(); /* Get the controller for delegation */ BBufferStreamManager *StreamManager() const; /* number of buffers in stream */ int32 CountBuffers() const; /* Create or delete a subscriber id for subsequent operations */ status_t Subscribe(char *name, subscriber_id *subID, sem_id semID); status_t Unsubscribe(subscriber_id subID); /* Enter into or quit the stream */ status_t EnterStream(subscriber_id subID, subscriber_id neighbor, bool before); status_t ExitStream(subscriber_id subID); /* queries about a subscriber */ bool IsSubscribed(subscriber_id subID); bool IsEntered(subscriber_id subID); status_t SubscriberInfo(subscriber_id subID, char** name, stream_id* streamID, int32* position); /* Force an error return of a subscriber if it's blocked */ status_t UnblockSubscriber(subscriber_id subID); /* Acquire and release a buffer */ status_t AcquireBuffer(subscriber_id subID, buffer_id *bufID, bigtime_t timeout); status_t ReleaseBuffer(subscriber_id subID); /* Get the attributes of a particular buffer */ size_t BufferSize(buffer_id bufID) const; char *BufferData(buffer_id bufID) const; bool IsFinalBuffer(buffer_id bufID) const; /* Get attributes of a particular subscriber */ int32 CountBuffersHeld(subscriber_id subID); /* Queries for the BBufferStream */ int32 CountSubscribers() const; int32 CountEnteredSubscribers() const; subscriber_id FirstSubscriber() const; subscriber_id LastSubscriber() const; subscriber_id NextSubscriber(subscriber_id subID); subscriber_id PrevSubscriber(subscriber_id subID); /* debugging aids */ void PrintStream(); void PrintBuffers(); void PrintSubscribers(); /* gaining exclusive access to the BBufferStream */ bool Lock(); void Unlock(); /* introduce a new buffer into the "newest" end of the chain */ status_t AddBuffer(buffer_id bufID); /* remove a buffer from the "oldest" end of the chain */ buffer_id RemoveBuffer(bool force); /* allocate a buffer from shared memory and create a bufID for it. */ buffer_id CreateBuffer(size_t size, bool isFinal); /* deallocate a buffer and returns its bufID to the freelist */ void DestroyBuffer(buffer_id bufID); /* remove and destroy any "newest" buffers from the head of the chain * that have not yet been claimed by any subscribers. If there are * no subscribers, this clears the entire chain. */ void RescindBuffers(); /* ================ Private member functions that assume locking already has been done. ================ */ private: virtual void _ReservedBufferStream1(); virtual void _ReservedBufferStream2(); virtual void _ReservedBufferStream3(); virtual void _ReservedBufferStream4(); /* initialize the free list of subscribers */ void InitSubscribers(); /* return TRUE if subID appears valid */ bool IsSubscribedSafe(subscriber_id subID) const; /* return TRUE if subID is entered into the stream */ bool IsEnteredSafe(subscriber_id subID) const; /* initialize the free list of buffer IDs */ void InitBuffers(); /* Wake a blocked subscriber */ status_t WakeSubscriber(subscriber_id subID); /* Give subID all the buffers it can get */ void InheritBuffers(subscriber_id subID); /* Relinquish any buffers held by subID */ void BequeathBuffers(subscriber_id subID); /* Fast version of ReleaseBuffer() */ status_t ReleaseBufferSafe(subscriber_id subID); /* Release a buffer to a subscriber */ status_t ReleaseBufferTo(buffer_id bufID, subscriber_id subID); /* deallocate all buffers */ void FreeAllBuffers(); /* deallocate all subscribers */ void FreeAllSubscribers(); /* ================ Private data members ================ */ BLocker fLock; area_id fAreaID; /* area id for this BBufferStream */ BBufferStreamManager *fStreamManager; BSubscriber *fHeadFeeder; BSubscriber *fTailFeeder; size_t fHeaderSize; /* ================ subscribers ================ */ _sub_info *fFreeSubs; /* free list of subscribers */ _sub_info *fFirstSub; /* first entered in itinierary */ _sub_info *fLastSub; /* last entered in itinerary */ sem_id fFirstSem; /* semaphore used by fFirstSub */ int32 fSubCount; int32 fEnteredSubCount; _sub_info fSubscribers[B_MAX_SUBSCRIBER_COUNT]; /* ================ buffers ================ */ _sbuf_info *fFreeBuffers; _sbuf_info *fOldestBuffer; /* first in line */ _sbuf_info *fNewestBuffer; /* fNewest->fNext = NULL */ int32 fCountBuffers; _sbuf_info fBuffers[B_MAX_BUFFER_COUNT]; uint32 _reserved[4]; }; #endif // #ifdef _BUFFER_STREAM_H