/* * Copyright (C) 2010 Apple Inc. All rights reserved. * Copyright (C) 2010 Nokia Corporation and/or its subsidiary(-ies) * Portions Copyright (c) 2010 Motorola Mobility, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #ifndef Connection_h #define Connection_h #include "Arguments.h" #include "MessageDecoder.h" #include "MessageEncoder.h" #include "MessageReceiver.h" #include "ProcessType.h" #include #include #include #include #include #include #include #include #if OS(DARWIN) && !USE(UNIX_DOMAIN_SOCKETS) #include #include #include #endif #if PLATFORM(QT) || PLATFORM(GTK) || PLATFORM(EFL) #include "PlatformProcessIdentifier.h" #endif #if PLATFORM(GTK) #include "GSocketMonitor.h" #endif #if PLATFORM(QT) QT_BEGIN_NAMESPACE class QSocketNotifier; QT_END_NAMESPACE #endif namespace IPC { struct WaitForMessageState; enum MessageSendFlags { // Whether this message should be dispatched when waiting for a sync reply. // This is the default for synchronous messages. DispatchMessageEvenWhenWaitingForSyncReply = 1 << 0, }; enum SyncMessageSendFlags { // Use this to inform that this sync call will suspend this process until the user responds with input. InformPlatformProcessWillSuspend = 1 << 0, UseFullySynchronousModeForTesting = 1 << 1, }; enum WaitForMessageFlags { // Use this to make waitForMessage be interrupted immediately by any incoming sync messages. InterruptWaitingIfSyncMessageArrives = 1 << 0, }; #define MESSAGE_CHECK_BASE(assertion, connection) do \ if (!(assertion)) { \ ASSERT(assertion); \ (connection)->markCurrentlyDispatchedMessageAsInvalid(); \ return; \ } \ while (0) class Connection : public ThreadSafeRefCounted { public: class Client : public MessageReceiver { public: virtual void didClose(Connection&) = 0; virtual void didReceiveInvalidMessage(Connection&, StringReference messageReceiverName, StringReference messageName) = 0; virtual IPC::ProcessType localProcessType() = 0; virtual IPC::ProcessType remoteProcessType() = 0; protected: virtual ~Client() { } }; class WorkQueueMessageReceiver : public MessageReceiver, public ThreadSafeRefCounted { }; #if USE(UNIX_DOMAIN_SOCKETS) typedef int Identifier; static bool identifierIsNull(Identifier identifier) { return identifier == -1; } struct SocketPair { int client; int server; }; enum ConnectionOptions { SetCloexecOnClient = 1 << 0, SetCloexecOnServer = 1 << 1, }; static Connection::SocketPair createPlatformConnection(unsigned options = SetCloexecOnClient | SetCloexecOnServer); #elif OS(DARWIN) struct Identifier { Identifier() : port(MACH_PORT_NULL) { } Identifier(mach_port_t port) : port(port) { } Identifier(mach_port_t port, OSObjectPtr xpcConnection) : port(port) , xpcConnection(WTFMove(xpcConnection)) { } mach_port_t port; OSObjectPtr xpcConnection; }; static bool identifierIsNull(Identifier identifier) { return identifier.port == MACH_PORT_NULL; } xpc_connection_t xpcConnection() const { return m_xpcConnection.get(); } bool getAuditToken(audit_token_t&); pid_t remoteProcessID() const; #elif OS(WINDOWS) typedef HANDLE Identifier; static bool createServerAndClientIdentifiers(Identifier& serverIdentifier, Identifier& clientIdentifier); static bool identifierIsNull(Identifier identifier) { return !identifier; } #endif static Ref createServerConnection(Identifier, Client&); static Ref createClientConnection(Identifier, Client&); ~Connection(); Client* client() const { return m_client; } #if (PLATFORM(MAC) || (PLATFORM(QT) && USE(MACH_PORTS))) && __MAC_OS_X_VERSION_MIN_REQUIRED <= 101000 void setShouldCloseConnectionOnMachExceptions(); #elif PLATFORM(QT) && USE(UNIX_DOMAIN_SOCKETS) void setShouldCloseConnectionOnProcessTermination(WebKit::PlatformProcessIdentifier); #endif void setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool); void setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure); // The set callback will be called on the connection work queue when the connection is closed, // before didCall is called on the client thread. Must be called before the connection is opened. // In the future we might want a more generic way to handle sync or async messages directly // on the work queue, for example if we want to handle them on some other thread we could avoid // handling the message on the client thread first. typedef void (*DidCloseOnConnectionWorkQueueCallback)(Connection*); void setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback); void addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue*, WorkQueueMessageReceiver*); void removeWorkQueueMessageReceiver(StringReference messageReceiverName); bool open(); void invalidate(); void markCurrentlyDispatchedMessageAsInvalid(); void postConnectionDidCloseOnConnectionWorkQueue(); template bool send(T&& message, uint64_t destinationID, unsigned messageSendFlags = 0); template bool sendSync(T&& message, typename T::Reply&& reply, uint64_t destinationID, std::chrono::milliseconds timeout = std::chrono::milliseconds::max(), unsigned syncSendFlags = 0); template bool waitForAndDispatchImmediately(uint64_t destinationID, std::chrono::milliseconds timeout, unsigned waitForMessageFlags = 0); std::unique_ptr createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID); bool sendMessage(std::unique_ptr, unsigned messageSendFlags = 0, bool alreadyRecordedMessage = false); std::unique_ptr sendSyncMessage(uint64_t syncRequestID, std::unique_ptr, std::chrono::milliseconds timeout, unsigned syncSendFlags = 0); std::unique_ptr sendSyncMessageFromSecondaryThread(uint64_t syncRequestID, std::unique_ptr, std::chrono::milliseconds timeout); bool sendSyncReply(std::unique_ptr); void wakeUpRunLoop(); void incrementDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount() { ++m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount; } void decrementDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount() { --m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount; } bool inSendSync() const { return m_inSendSyncCount; } Identifier identifier() const; #if PLATFORM(COCOA) || (PLATFORM(QT) && USE(MACH_PORTS)) bool kill(); void terminateSoon(double intervalInSeconds); #endif bool isValid() const { return m_client; } #if HAVE(QOS_CLASSES) void setShouldBoostMainThreadOnSyncMessage(bool b) { m_shouldBoostMainThreadOnSyncMessage = b; } #endif uint64_t installIncomingSyncMessageCallback(std::function); void uninstallIncomingSyncMessageCallback(uint64_t); bool hasIncomingSyncMessage(); void allowFullySynchronousModeForTesting() { m_fullySynchronousModeIsAllowedForTesting = true; } private: Connection(Identifier, bool isServer, Client&); void platformInitialize(Identifier); void platformInvalidate(); std::unique_ptr waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, std::chrono::milliseconds timeout, unsigned waitForMessageFlags); std::unique_ptr waitForSyncReply(uint64_t syncRequestID, std::chrono::milliseconds timeout, unsigned syncSendFlags); // Called on the connection work queue. void processIncomingMessage(std::unique_ptr); void processIncomingSyncReply(std::unique_ptr); void dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver&, MessageDecoder&); bool canSendOutgoingMessages() const; bool platformCanSendOutgoingMessages() const; void sendOutgoingMessages(); bool sendOutgoingMessage(std::unique_ptr); void connectionDidClose(); // Called on the listener thread. void dispatchOneMessage(); void dispatchMessage(std::unique_ptr); void dispatchMessage(MessageDecoder&); void dispatchSyncMessage(MessageDecoder&); void dispatchDidReceiveInvalidMessage(const CString& messageReceiverNameString, const CString& messageNameString); void didFailToSendSyncMessage(); // Can be called on any thread. void enqueueIncomingMessage(std::unique_ptr); void willSendSyncMessage(unsigned syncSendFlags); void didReceiveSyncReply(unsigned syncSendFlags); Client* m_client; bool m_isServer; std::atomic m_syncRequestID; bool m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage; bool m_shouldExitOnSyncMessageSendFailure; DidCloseOnConnectionWorkQueueCallback m_didCloseOnConnectionWorkQueueCallback; bool m_isConnected; Ref m_connectionQueue; HashMap, RefPtr>> m_workQueueMessageReceivers; unsigned m_inSendSyncCount; unsigned m_inDispatchMessageCount; unsigned m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount; unsigned m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting { 0 }; bool m_fullySynchronousModeIsAllowedForTesting { false }; bool m_didReceiveInvalidMessage; // Incoming messages. Lock m_incomingMessagesMutex; Deque> m_incomingMessages; // Outgoing messages. Lock m_outgoingMessagesMutex; Deque> m_outgoingMessages; Condition m_waitForMessageCondition; Lock m_waitForMessageMutex; WaitForMessageState* m_waitingForMessage; // Represents a sync request for which we're waiting on a reply. struct PendingSyncReply { // The request ID. uint64_t syncRequestID; // The reply decoder, will be null if there was an error processing the sync // message on the other side. std::unique_ptr replyDecoder; // Will be set to true once a reply has been received. bool didReceiveReply; PendingSyncReply() : syncRequestID(0) , didReceiveReply(false) { } explicit PendingSyncReply(uint64_t syncRequestID) : syncRequestID(syncRequestID) , didReceiveReply(0) { } }; class SyncMessageState; friend class SyncMessageState; Lock m_syncReplyStateMutex; bool m_shouldWaitForSyncReplies; Vector m_pendingSyncReplies; class SecondaryThreadPendingSyncReply; typedef HashMap SecondaryThreadPendingSyncReplyMap; SecondaryThreadPendingSyncReplyMap m_secondaryThreadPendingSyncReplyMap; Lock m_incomingSyncMessageCallbackMutex; HashMap> m_incomingSyncMessageCallbacks; RefPtr m_incomingSyncMessageCallbackQueue; uint64_t m_nextIncomingSyncMessageCallbackID { 0 }; #if HAVE(QOS_CLASSES) pthread_t m_mainThread { 0 }; bool m_shouldBoostMainThreadOnSyncMessage { false }; #endif #if USE(UNIX_DOMAIN_SOCKETS) // Called on the connection queue. void readyReadHandler(); bool processMessage(); Vector m_readBuffer; Vector m_fileDescriptors; int m_socketDescriptor; #if PLATFORM(GTK) GSocketMonitor m_socketMonitor; #endif #if PLATFORM(QT) QSocketNotifier* m_socketNotifier; #endif #elif OS(DARWIN) // Called on the connection queue. void receiveSourceEventHandler(); void initializeDeadNameSource(); mach_port_t m_sendPort; dispatch_source_t m_deadNameSource; mach_port_t m_receivePort; dispatch_source_t m_receivePortDataAvailableSource; #if (PLATFORM(MAC) || (PLATFORM(QT) && USE(MACH_PORTS))) && __MAC_OS_X_VERSION_MIN_REQUIRED <= 101000 void exceptionSourceEventHandler(); // If setShouldCloseConnectionOnMachExceptions has been called, this has // the exception port that exceptions from the other end will be sent on. mach_port_t m_exceptionPort; dispatch_source_t m_exceptionPortDataAvailableSource; #endif OSObjectPtr m_xpcConnection; #elif OS(WINDOWS) // Called on the connection queue. void readEventHandler(); void writeEventHandler(); // Called by Connection::SyncMessageState::waitWhileDispatchingSentWin32Messages. // The absoluteTime is in seconds, starting on January 1, 1970. The time is assumed to use the // same time zone as WTF::currentTime(). Dispatches sent (not posted) messages to the passed-in // set of HWNDs until the semaphore is signaled or absoluteTime is reached. Returns true if the // semaphore is signaled, false otherwise. static bool dispatchSentMessagesUntil(const Vector& windows, WTF::BinarySemaphore& semaphore, double absoluteTime); Vector m_readBuffer; OVERLAPPED m_readState; std::unique_ptr m_pendingWriteEncoder; OVERLAPPED m_writeState; HANDLE m_connectionPipe; #endif }; template bool Connection::send(T&& message, uint64_t destinationID, unsigned messageSendFlags) { COMPILE_ASSERT(!T::isSync, AsyncMessageExpected); auto encoder = std::make_unique(T::receiverName(), T::name(), destinationID); encoder->encode(message.arguments()); return sendMessage(WTFMove(encoder), messageSendFlags); } template bool Connection::sendSync(T&& message, typename T::Reply&& reply, uint64_t destinationID, std::chrono::milliseconds timeout, unsigned syncSendFlags) { COMPILE_ASSERT(T::isSync, SyncMessageExpected); uint64_t syncRequestID = 0; std::unique_ptr encoder = createSyncMessageEncoder(T::receiverName(), T::name(), destinationID, syncRequestID); if (syncSendFlags & SyncMessageSendFlags::UseFullySynchronousModeForTesting) { encoder->setFullySynchronousModeForTesting(); m_fullySynchronousModeIsAllowedForTesting = true; } // Encode the rest of the input arguments. encoder->encode(message.arguments()); // Now send the message and wait for a reply. std::unique_ptr replyDecoder = sendSyncMessage(syncRequestID, WTFMove(encoder), timeout, syncSendFlags); if (!replyDecoder) return false; // Decode the reply. return replyDecoder->decode(reply); } template bool Connection::waitForAndDispatchImmediately(uint64_t destinationID, std::chrono::milliseconds timeout, unsigned waitForMessageFlags) { std::unique_ptr decoder = waitForMessage(T::receiverName(), T::name(), destinationID, timeout, waitForMessageFlags); if (!decoder) return false; ASSERT(decoder->destinationID() == destinationID); m_client->didReceiveMessage(*this, *decoder); return true; } } // namespace IPC #endif // Connection_h